摘要
在公司跟着做的第二个项目做完了,第三个项目需要用到一些比较潮流的新技术,pulsar就是其中之一。
这段时间上班就是在改完测试提的bug之外,学习。这篇博客记录下pulsar的消息传输案例。
正文
本文使用的Pulsar2.8.0
参考文章
一、上手
1.1 安装
安装基于Centos8.4
下载pulsar
1
| wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz
|
将pulsar解压到/opt/module文件夹下
1
| tar -zxvf apache-pulsar-2.8.0-bin.tar.gz -C /opt/module
|
进入到pulsar路径下,将pulsar单节点启动
会有一大堆乱七八糟的日志,直接不用管了。一般出现下面这样的图时,就是启动成功了。
1.2 搭建项目
创建SpringBoot项目
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>pulsar-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>pulsar-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<pulsar.version>2.8.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
|
application.yml
1
2
3
4
5
6
7
8
| pulsar:
servers: pulsar://192.168.110.39:6650
producer:
topic: cccTopic
producerName: ccc
consumer:
topic: cccTopic
subscriptionName: CCC
|
创建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| @Configuration
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
private String servers;
private Producer producer;
private Consumer consumer;
public String getServers() {
return servers;
}
public void setServers(String servers) {
this.servers = servers;
}
public Producer getProducer() {
return producer;
}
public void setProducer(Producer producer) {
this.producer = producer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public static class Producer {
private String topic;
private String producerName;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getProducerName() {
return producerName;
}
public void setProducerName(String producerName) {
this.producerName = producerName;
}
}
public static class Consumer {
private String topic;
private String subscriptionName;
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getSubscriptionName() {
return subscriptionName;
}
public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
}
}
|
单元测试是否连接成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
| @SpringBootTest
class PulsarDemoApplicationTests {
@Autowired
private PulsarClientConfig config;
@Autowired
private PulsarProperties pulsarProperties;
@Test
void testIsConnected() throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarProperties.getServers())
.build();
//这个永远返回true,不管连的地址有木有。所以想看有没有连成功,需要创建个生产者或者消费者。
//boolean closed = client.isClosed();
Producer<byte[]> producer = client.newProducer()
.topic(pulsarProperties.getProducer().getProducerName())
.producerName(pulsarProperties.getProducer().getProducerName())
.create();
boolean connected = producer.isConnected();
if(connected){
System.out.println("已连接");
producer.close();
}else{
System.out.println("未连接");
}
}
}
|
二、使用
2.1 生产者
权限模式
- Shared:默认设置。多个生产者可以在一个topic上发送消息。
- Exclusive:一个topic只有一个生产者可以发送消息。如果已经有生产者连接该topic,再有其他生产者连接发布消息则会报错。
- WaitForExclusive:如果已经有生产者连接topic,再有生产者连接,则会被挂起,知道获取Exclusive权限
发送模式
- 同步发送:生产者发送消息之后,会等待节点的消息确认。如果没有收到节点的确认,那么发送操作就会被认定为失败。本质上pulsar的同步send也是基于异步sendAsync来实现的。
- 异步发送:生产者将消息丢进内存的队列里,立即返回。本地客户端在后台将消息发往节点。
如何验证服务器到节点的消息确认,可以通过设置sendTimeOut来设置超时时间,经测试sendTimeout(2,TimeUnit.MILLISECONDS)时候,同步就会报错了。而sendAsync不会报错。
2.2 消费者
接收模式
- 同步接收:会一直卡在接收消息的状态,不会往下执行,除非收到了消息。
- 异步接收:会立即返回一个
CompletableFuture
监听器MessageListener
客户端库为消费者提供侦听器实现。 例如,Java 客户端提供了一个 MesssageListener 接口。 在此接口中,每当接收到新消息时,都会调用 received 方法。
消费确认
- Acknowledgement:返回消费成功。当所有的订阅该Topic的消费者,都成功消费之后,这条消息就会被清除。
- Negative Acknowledgement:返回消费失败。如果该条消息通知节点消费失败,节点会在指定时间延迟后,再次发送消息给该消费者。
- Acknowledgement Timeout:消费消息超时。如果在该指定的时间内,未成功消费,节点则会再次发送消息。
消费者订阅模式
pulsar提供了四种订阅模式。订阅Topic的模式是由消费者指定的。
- exclusive:一个Topic只允许一个消费者订阅,其他消费者订阅会报错。
- shared:多个消费者可以共同订阅一个Topic,并且每条消息只发给一个消费者消费。以轮询的方式在消费间发送。
- failover:多个消费者可以共同订阅一个Topic,发送消息时,挑选一个主消费者,将该消息发给主消费者进行消费,如果该消费者挂了,会另外挑选一个主消费者。
- key_shared:多个消费者可以共同订阅一个Topic,并且具有相同key的消息,仅发给特定某个消费者。当有消费者连接或者断开时,key会发生改变。
2.3 Topic
Topic的格式如下
1
| {persistent|non-persistent}://tenant/namespace/topic
|
其中涉及的字段含义:
- persistent|non-persistent:Topic的存储类型。持久存储和非持久存储,持久存储的Topic,所有的消息,都会保存到磁盘上;非持久存储保存在内存中。默认是持久存储persistent。
- tenant:集群中的用户。默认是public
- namespace:用户下的命名空间。默认是default
- topic:Topic的名称
2.4 案例
PulsarClientConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
| @Configuration
public class PulsarClientConfig {
@Autowired
private PulsarProperties pulsarProperties;
/**
* 获取Client
*
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public PulsarClient getClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(pulsarProperties.getServers())
.build();
}
/**
* 获取生产者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public Producer<byte[]> getProducer(PulsarClient client) throws PulsarClientException {
return client.newProducer()
.topic(pulsarProperties.getProducer().getTopic())
.producerName(pulsarProperties.getProducer().getProducerName())
.sendTimeout(2, TimeUnit.SECONDS)
.create();
}
/**
* 获取消费者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:15
*/
public Consumer getConsumer(PulsarClient client) throws PulsarClientException {
return client.newConsumer()
.topic(pulsarProperties.getConsumer().getTopic())
.subscriptionName(pulsarProperties.getConsumer().getSubscriptionName())
//配置接收失败,pulsar再次发送消息的时间间隔
.negativeAckRedeliveryDelay(5, TimeUnit.SECONDS)
.subscribe();
}
/**
* 关闭
*
* @param client
*/
public void close(PulsarClient client) {
try {
client.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
|
需求A
1个生产者发送消息,1个消费者进行监听并消费,用同步发送、同步接收的方式实现。
TestSync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
| @SpringBootTest
class TestSync {
@Autowired
private PulsarClientConfig config;
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test
void testProducer() throws Exception {
PulsarClient client = config.getClient();
Producer<byte[]> producer = config.getProducer(client);
for (int i = 0; i < 100; i++) {
final String content = "my-SyncSend-message-" + i;
producer.send((content).getBytes());//同步发送
System.out.println("Send message: " + content);
}
//关闭
producer.close();
client.close();
}
@Test
void testConsumer() throws PulsarClientException {
PulsarClient client = config.getClient();
Consumer consumer = config.getConsumer(client);
//消费消息
while (true) {
Message message = consumer.receive();
try {
System.out.printf("Message received: %s%n", new String(message.getData()));
consumer.acknowledge(message);
} catch (Exception e) {
e.printStackTrace();
consumer.negativeAcknowledge(message);
}
}
}
}
|
需求B
1个生产者发送消息,1个消费者进行监听并消费,用异步发送、异步接收的方式实现。
TestAsync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
| @SpringBootTest
class TestAsync {
@Autowired
private PulsarClientConfig config;
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test
void testProducer() throws Exception {
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
PulsarClient client = config.getClient();
Producer<byte[]> producer = config.getProducer(client);
for (int i = 0; i < 100; i++) {
final String content = "my-AsyncSend-message-" + i;
//异步发送
CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());
future.handle(new BiFunction<MessageId, Throwable, Object>() {
@Override
public Object apply(MessageId messageId, Throwable throwable) {
if (throwable == null) {
System.out.println("Message persisted: " + content);
} else {
System.out.println("Error persisting message: " + content + throwable);
}
return null;
}
});
futures.add(future);
}
System.out.println("Waiting for async ops to complete");
for (CompletableFuture<MessageId> future : futures) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future.join();
}
System.out.println("All operations completed");
//关闭
producer.close();
client.close();
}
@Test
void testConsumer() throws PulsarClientException {
PulsarClient client = config.getClient();
Consumer consumer = config.getConsumer(client);
List<CompletableFuture<Message>> futures = Lists.newArrayList();
//消费消息
for(int i=0;i<100;i++){
CompletableFuture<Message> future = consumer.receiveAsync();
System.out.println(i);
future.handle(new BiFunction<Message, Throwable, Object>() {
@Override
public Object apply(Message message, Throwable throwable) {
if (throwable == null) {
System.out.println("Message consumed: " + new String(message.getData()));
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
} else {
System.out.println("Error consumed message: " + new String(message.getData()) + throwable);
consumer.negativeAcknowledge(message);
}
return null;
}
});
futures.add(future);
}
System.out.println("Waiting for async ops to complete");
for (CompletableFuture<Message> future : futures) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future.join();
}
System.out.println("All operations completed");
consumer.close();
client.close();
}
}
|