言成言成啊 | Kit Chen's Blog

Pulsar入门

发布于2021-09-06 22:17:09,更新于2023-11-17 20:10:30,标签:java pulsar  文章会持续修订,转载请注明来源地址:https://meethigher.top/blog

本文使用的Pulsar2.8.0

参考文章

一、上手

1.1 安装

安装基于Centos8.4

下载pulsar

1
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz

将pulsar解压到/opt/module文件夹下

1
tar -zxvf apache-pulsar-2.8.0-bin.tar.gz -C /opt/module

进入到pulsar路径下,将pulsar单节点启动

1
bin/pulsar standalone

会有一大堆乱七八糟的日志,直接不用管了。一般出现下面这样的图时,就是启动成功了。

1.2 搭建项目

创建SpringBoot项目

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>pulsar-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>pulsar-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<pulsar.version>2.8.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

application.yml

1
2
3
4
5
6
7
8
pulsar:
servers: pulsar://192.168.110.39:6650
producer:
topic: cccTopic
producerName: ccc
consumer:
topic: cccTopic
subscriptionName: CCC

创建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Configuration
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
private String servers;
private Producer producer;
private Consumer consumer;



public String getServers() {
return servers;
}

public void setServers(String servers) {
this.servers = servers;
}

public Producer getProducer() {
return producer;
}

public void setProducer(Producer producer) {
this.producer = producer;
}

public Consumer getConsumer() {
return consumer;
}

public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}

public static class Producer {
private String topic;
private String producerName;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getProducerName() {
return producerName;
}

public void setProducerName(String producerName) {
this.producerName = producerName;
}
}

public static class Consumer {
private String topic;
private String subscriptionName;

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getSubscriptionName() {
return subscriptionName;
}

public void setSubscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
}
}

}

单元测试是否连接成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
class PulsarDemoApplicationTests {
@Autowired
private PulsarClientConfig config;
@Autowired
private PulsarProperties pulsarProperties;

@Test
void testIsConnected() throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarProperties.getServers())
.build();
//这个永远返回true,不管连的地址有木有。所以想看有没有连成功,需要创建个生产者或者消费者。
//boolean closed = client.isClosed();
Producer<byte[]> producer = client.newProducer()
.topic(pulsarProperties.getProducer().getProducerName())
.producerName(pulsarProperties.getProducer().getProducerName())
.create();
boolean connected = producer.isConnected();
if(connected){
System.out.println("已连接");
producer.close();
}else{
System.out.println("未连接");
}
}
}

二、使用

2.1 生产者

权限模式

  • Shared:默认设置。多个生产者可以在一个topic上发送消息。
  • Exclusive:一个topic只有一个生产者可以发送消息。如果已经有生产者连接该topic,再有其他生产者连接发布消息则会报错。
  • WaitForExclusive:如果已经有生产者连接topic,再有生产者连接,则会被挂起,知道获取Exclusive权限

发送模式

  • 同步发送:生产者发送消息之后,会等待节点的消息确认。如果没有收到节点的确认,那么发送操作就会被认定为失败。本质上pulsar的同步send也是基于异步sendAsync来实现的。
  • 异步发送:生产者将消息丢进内存的队列里,立即返回。本地客户端在后台将消息发往节点。

如何验证服务器到节点的消息确认,可以通过设置sendTimeOut来设置超时时间,经测试sendTimeout(2,TimeUnit.MILLISECONDS)时候,同步就会报错了。而sendAsync不会报错。

2.2 消费者

接收模式

  • 同步接收:会一直卡在接收消息的状态,不会往下执行,除非收到了消息。
  • 异步接收:会立即返回一个CompletableFuture

监听器MessageListener

客户端库为消费者提供侦听器实现。 例如,Java 客户端提供了一个 MesssageListener 接口。 在此接口中,每当接收到新消息时,都会调用 received 方法。

消费确认

  • Acknowledgement:返回消费成功。当所有的订阅该Topic的消费者,都成功消费之后,这条消息就会被清除。
  • Negative Acknowledgement:返回消费失败。如果该条消息通知节点消费失败,节点会在指定时间延迟后,再次发送消息给该消费者。
  • Acknowledgement Timeout:消费消息超时。如果在该指定的时间内,未成功消费,节点则会再次发送消息。

消费者订阅模式

pulsar提供了四种订阅模式。订阅Topic的模式是由消费者指定的。

  • exclusive:一个Topic只允许一个消费者订阅,其他消费者订阅会报错。
  • shared:多个消费者可以共同订阅一个Topic,并且每条消息只发给一个消费者消费。以轮询的方式在消费间发送。
  • failover:多个消费者可以共同订阅一个Topic,发送消息时,挑选一个主消费者,将该消息发给主消费者进行消费,如果该消费者挂了,会另外挑选一个主消费者。
  • key_shared:多个消费者可以共同订阅一个Topic,并且具有相同key的消息,仅发给特定某个消费者。当有消费者连接或者断开时,key会发生改变。

2.3 Topic

Topic的格式如下

1
{persistent|non-persistent}://tenant/namespace/topic

其中涉及的字段含义:

  • persistent|non-persistent:Topic的存储类型。持久存储和非持久存储,持久存储的Topic,所有的消息,都会保存到磁盘上;非持久存储保存在内存中。默认是持久存储persistent。
  • tenant:集群中的用户。默认是public
  • namespace:用户下的命名空间。默认是default
  • topic:Topic的名称

2.4 案例

PulsarClientConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
public class PulsarClientConfig {
@Autowired
private PulsarProperties pulsarProperties;

/**
* 获取Client
*
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public PulsarClient getClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(pulsarProperties.getServers())
.build();
}

/**
* 获取生产者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public Producer<byte[]> getProducer(PulsarClient client) throws PulsarClientException {
return client.newProducer()
.topic(pulsarProperties.getProducer().getTopic())
.producerName(pulsarProperties.getProducer().getProducerName())
.sendTimeout(2, TimeUnit.SECONDS)
.create();
}


/**
* 获取消费者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:15
*/
public Consumer getConsumer(PulsarClient client) throws PulsarClientException {
return client.newConsumer()
.topic(pulsarProperties.getConsumer().getTopic())
.subscriptionName(pulsarProperties.getConsumer().getSubscriptionName())
//配置接收失败,pulsar再次发送消息的时间间隔
.negativeAckRedeliveryDelay(5, TimeUnit.SECONDS)
.subscribe();
}

/**
* 关闭
*
* @param client
*/
public void close(PulsarClient client) {
try {
client.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}

}

需求A

1个生产者发送消息,1个消费者进行监听并消费,用同步发送、同步接收的方式实现。

TestSync.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@SpringBootTest
class TestSync {
@Autowired
private PulsarClientConfig config;

private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

@Test
void testProducer() throws Exception {
PulsarClient client = config.getClient();
Producer<byte[]> producer = config.getProducer(client);
for (int i = 0; i < 100; i++) {
final String content = "my-SyncSend-message-" + i;
producer.send((content).getBytes());//同步发送
System.out.println("Send message: " + content);
}
//关闭
producer.close();
client.close();
}

@Test
void testConsumer() throws PulsarClientException {
PulsarClient client = config.getClient();
Consumer consumer = config.getConsumer(client);
//消费消息
while (true) {
Message message = consumer.receive();
try {
System.out.printf("Message received: %s%n", new String(message.getData()));
consumer.acknowledge(message);
} catch (Exception e) {
e.printStackTrace();
consumer.negativeAcknowledge(message);
}
}
}
}

需求B

1个生产者发送消息,1个消费者进行监听并消费,用异步发送、异步接收的方式实现。

TestAsync.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@SpringBootTest
class TestAsync {
@Autowired
private PulsarClientConfig config;

private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

@Test
void testProducer() throws Exception {
List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
PulsarClient client = config.getClient();
Producer<byte[]> producer = config.getProducer(client);
for (int i = 0; i < 100; i++) {
final String content = "my-AsyncSend-message-" + i;
//异步发送
CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());
future.handle(new BiFunction<MessageId, Throwable, Object>() {
@Override
public Object apply(MessageId messageId, Throwable throwable) {
if (throwable == null) {
System.out.println("Message persisted: " + content);
} else {
System.out.println("Error persisting message: " + content + throwable);
}
return null;
}
});
futures.add(future);
}

System.out.println("Waiting for async ops to complete");
for (CompletableFuture<MessageId> future : futures) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future.join();
}
System.out.println("All operations completed");

//关闭
producer.close();
client.close();
}

@Test
void testConsumer() throws PulsarClientException {
PulsarClient client = config.getClient();
Consumer consumer = config.getConsumer(client);
List<CompletableFuture<Message>> futures = Lists.newArrayList();
//消费消息
for(int i=0;i<100;i++){
CompletableFuture<Message> future = consumer.receiveAsync();
System.out.println(i);
future.handle(new BiFunction<Message, Throwable, Object>() {
@Override
public Object apply(Message message, Throwable throwable) {
if (throwable == null) {
System.out.println("Message consumed: " + new String(message.getData()));
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
} else {
System.out.println("Error consumed message: " + new String(message.getData()) + throwable);
consumer.negativeAcknowledge(message);
}
return null;
}
});
futures.add(future);

}
System.out.println("Waiting for async ops to complete");
for (CompletableFuture<Message> future : futures) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future.join();
}
System.out.println("All operations completed");
consumer.close();
client.close();
}
}
发布:2021-09-06 22:17:09
修改:2023-11-17 20:10:30
链接:https://meethigher.top/blog/2021/pulsar/
标签:java pulsar 
付款码 打赏 分享
Shift+Ctrl+1 可控制工具栏