摘要
在公司跟着做的第二个项目做完了,第三个项目需要用到一些比较潮流的新技术,pulsar就是其中之一。
这段时间上班就是在改完测试提的bug之外,学习。这篇博客记录下pulsar的消息传输案例。
正文
本文使用的Pulsar2.8.0
参考文章
一、上手 1.1 安装 安装基于Centos8.4
下载pulsar
1
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz
将pulsar解压到/opt/module文件夹下
1
tar -zxvf apache-pulsar-2.8.0-bin.tar.gz -C /opt/module
进入到pulsar路径下,将pulsar单节点启动
会有一大堆乱七八糟的日志,直接不用管了。一般出现下面这样的图时,就是启动成功了。
展开
1.2 搭建项目 创建SpringBoot项目
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion> 4.0.0</modelVersion>
<parent>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter-parent</artifactId>
<version> 2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId> com.example</groupId>
<artifactId> pulsar-demo</artifactId>
<version> 0.0.1-SNAPSHOT</version>
<name> pulsar-demo</name>
<description> Demo project for Spring Boot</description>
<properties>
<java.version> 1.8</java.version>
<pulsar.version> 2.8.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter-test</artifactId>
<scope> test</scope>
</dependency>
<dependency>
<groupId> org.apache.pulsar</groupId>
<artifactId> pulsar-client</artifactId>
<version> ${pulsar.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
1
2
3
4
5
6
7
8
pulsar :
servers : pulsar://192.168.110.39:6650
producer :
topic : cccTopic
producerName : ccc
consumer :
topic : cccTopic
subscriptionName : CCC
创建配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Configuration
@ConfigurationProperties ( prefix = "pulsar" )
public class PulsarProperties {
private String servers ;
private Producer producer ;
private Consumer consumer ;
public String getServers () {
return servers ;
}
public void setServers ( String servers ) {
this . servers = servers ;
}
public Producer getProducer () {
return producer ;
}
public void setProducer ( Producer producer ) {
this . producer = producer ;
}
public Consumer getConsumer () {
return consumer ;
}
public void setConsumer ( Consumer consumer ) {
this . consumer = consumer ;
}
public static class Producer {
private String topic ;
private String producerName ;
public String getTopic () {
return topic ;
}
public void setTopic ( String topic ) {
this . topic = topic ;
}
public String getProducerName () {
return producerName ;
}
public void setProducerName ( String producerName ) {
this . producerName = producerName ;
}
}
public static class Consumer {
private String topic ;
private String subscriptionName ;
public String getTopic () {
return topic ;
}
public void setTopic ( String topic ) {
this . topic = topic ;
}
public String getSubscriptionName () {
return subscriptionName ;
}
public void setSubscriptionName ( String subscriptionName ) {
this . subscriptionName = subscriptionName ;
}
}
}
单元测试是否连接成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
class PulsarDemoApplicationTests {
@Autowired
private PulsarClientConfig config ;
@Autowired
private PulsarProperties pulsarProperties ;
@Test
void testIsConnected () throws PulsarClientException {
PulsarClient client = PulsarClient . builder ()
. serviceUrl ( pulsarProperties . getServers ())
. build ();
//这个永远返回true,不管连的地址有木有。所以想看有没有连成功,需要创建个生产者或者消费者。
//boolean closed = client.isClosed();
Producer < byte []> producer = client . newProducer ()
. topic ( pulsarProperties . getProducer (). getProducerName ())
. producerName ( pulsarProperties . getProducer (). getProducerName ())
. create ();
boolean connected = producer . isConnected ();
if ( connected ){
System . out . println ( "已连接" );
producer . close ();
} else {
System . out . println ( "未连接" );
}
}
}
展开
二、使用 2.1 生产者 权限模式 Shared:默认设置。多个生产者可以在一个topic上发送消息。 Exclusive:一个topic只有一个生产者可以发送消息。如果已经有生产者连接该topic,再有其他生产者连接发布消息则会报错。 WaitForExclusive:如果已经有生产者连接topic,再有生产者连接,则会被挂起,知道获取Exclusive权限 发送模式 同步发送:生产者发送消息之后,会等待节点的消息确认。如果没有收到节点的确认,那么发送操作就会被认定为失败。本质上pulsar的同步send也是基于异步sendAsync来实现的。 异步发送:生产者将消息丢进内存的队列里,立即返回。本地客户端在后台将消息发往节点。 如何验证服务器到节点的消息确认,可以通过设置sendTimeOut来设置超时时间,经测试sendTimeout(2,TimeUnit.MILLISECONDS)时候,同步就会报错了。而sendAsync不会报错。
展开
展开
2.2 消费者 接收模式 同步接收:会一直卡在接收消息的状态,不会往下执行,除非收到了消息。 异步接收:会立即返回一个CompletableFuture 监听器MessageListener 客户端库为消费者提供侦听器实现。 例如,Java 客户端提供了一个 MesssageListener 接口。 在此接口中,每当接收到新消息时,都会调用 received 方法。
消费确认 Acknowledgement:返回消费成功。当所有的订阅该Topic的消费者,都成功消费之后,这条消息就会被清除。 Negative Acknowledgement:返回消费失败。如果该条消息通知节点消费失败,节点会在指定时间延迟后,再次发送消息给该消费者。 Acknowledgement Timeout:消费消息超时。如果在该指定的时间内,未成功消费,节点则会再次发送消息。 消费者订阅模式 pulsar提供了四种订阅模式。订阅Topic的模式是由消费者指定的。
exclusive:一个Topic只允许一个消费者订阅,其他消费者订阅会报错。 shared:多个消费者可以共同订阅一个Topic,并且每条消息只发给一个消费者消费。以轮询的方式在消费间发送。 failover:多个消费者可以共同订阅一个Topic,发送消息时,挑选一个主消费者,将该消息发给主消费者进行消费,如果该消费者挂了,会另外挑选一个主消费者。 key_shared:多个消费者可以共同订阅一个Topic,并且具有相同key的消息,仅发给特定某个消费者。当有消费者连接或者断开时,key会发生改变。
展开
2.3 Topic Topic的格式如下
1
{persistent|non-persistent}://tenant/namespace/topic
其中涉及的字段含义:
persistent|non-persistent:Topic的存储类型。持久存储和非持久存储,持久存储的Topic,所有的消息,都会保存到磁盘上;非持久存储保存在内存中。默认是持久存储persistent。 tenant:集群中的用户。默认是public namespace:用户下的命名空间。默认是default topic:Topic的名称 2.4 案例 PulsarClientConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
public class PulsarClientConfig {
@Autowired
private PulsarProperties pulsarProperties ;
/**
* 获取Client
*
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public PulsarClient getClient () throws PulsarClientException {
return PulsarClient . builder ()
. serviceUrl ( pulsarProperties . getServers ())
. build ();
}
/**
* 获取生产者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:16
*/
public Producer < byte []> getProducer ( PulsarClient client ) throws PulsarClientException {
return client . newProducer ()
. topic ( pulsarProperties . getProducer (). getTopic ())
. producerName ( pulsarProperties . getProducer (). getProducerName ())
. sendTimeout ( 2 , TimeUnit . SECONDS )
. create ();
}
/**
* 获取消费者
*
* @param client
* @return
* @throws PulsarClientException
* @author chenchuancheng
* @since 2021/9/6 15:15
*/
public Consumer getConsumer ( PulsarClient client ) throws PulsarClientException {
return client . newConsumer ()
. topic ( pulsarProperties . getConsumer (). getTopic ())
. subscriptionName ( pulsarProperties . getConsumer (). getSubscriptionName ())
//配置接收失败,pulsar再次发送消息的时间间隔
. negativeAckRedeliveryDelay ( 5 , TimeUnit . SECONDS )
. subscribe ();
}
/**
* 关闭
*
* @param client
*/
public void close ( PulsarClient client ) {
try {
client . close ();
} catch ( PulsarClientException e ) {
e . printStackTrace ();
}
}
}
需求A 1个生产者发送消息,1个消费者进行监听并消费,用同步发送、同步接收的方式实现。
TestSync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@SpringBootTest
class TestSync {
@Autowired
private PulsarClientConfig config ;
private static final DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
@Test
void testProducer () throws Exception {
PulsarClient client = config . getClient ();
Producer < byte []> producer = config . getProducer ( client );
for ( int i = 0 ; i < 100 ; i ++ ) {
final String content = "my-SyncSend-message-" + i ;
producer . send (( content ). getBytes ()); //同步发送
System . out . println ( "Send message: " + content );
}
//关闭
producer . close ();
client . close ();
}
@Test
void testConsumer () throws PulsarClientException {
PulsarClient client = config . getClient ();
Consumer consumer = config . getConsumer ( client );
//消费消息
while ( true ) {
Message message = consumer . receive ();
try {
System . out . printf ( "Message received: %s%n" , new String ( message . getData ()));
consumer . acknowledge ( message );
} catch ( Exception e ) {
e . printStackTrace ();
consumer . negativeAcknowledge ( message );
}
}
}
}
需求B 1个生产者发送消息,1个消费者进行监听并消费,用异步发送、异步接收的方式实现。
TestAsync.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@SpringBootTest
class TestAsync {
@Autowired
private PulsarClientConfig config ;
private static final DateTimeFormatter dtf = DateTimeFormatter . ofPattern ( "yyyy-MM-dd HH:mm:ss" );
@Test
void testProducer () throws Exception {
List < CompletableFuture < MessageId >> futures = Lists . newArrayList ();
PulsarClient client = config . getClient ();
Producer < byte []> producer = config . getProducer ( client );
for ( int i = 0 ; i < 100 ; i ++ ) {
final String content = "my-AsyncSend-message-" + i ;
//异步发送
CompletableFuture < MessageId > future = producer . sendAsync ( content . getBytes ());
future . handle ( new BiFunction < MessageId , Throwable , Object > () {
@Override
public Object apply ( MessageId messageId , Throwable throwable ) {
if ( throwable == null ) {
System . out . println ( "Message persisted: " + content );
} else {
System . out . println ( "Error persisting message: " + content + throwable );
}
return null ;
}
});
futures . add ( future );
}
System . out . println ( "Waiting for async ops to complete" );
for ( CompletableFuture < MessageId > future : futures ) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future . join ();
}
System . out . println ( "All operations completed" );
//关闭
producer . close ();
client . close ();
}
@Test
void testConsumer () throws PulsarClientException {
PulsarClient client = config . getClient ();
Consumer consumer = config . getConsumer ( client );
List < CompletableFuture < Message >> futures = Lists . newArrayList ();
//消费消息
for ( int i = 0 ; i < 100 ; i ++ ){
CompletableFuture < Message > future = consumer . receiveAsync ();
System . out . println ( i );
future . handle ( new BiFunction < Message , Throwable , Object > () {
@Override
public Object apply ( Message message , Throwable throwable ) {
if ( throwable == null ) {
System . out . println ( "Message consumed: " + new String ( message . getData ()));
try {
consumer . acknowledge ( message );
} catch ( PulsarClientException e ) {
e . printStackTrace ();
}
} else {
System . out . println ( "Error consumed message: " + new String ( message . getData ()) + throwable );
consumer . negativeAcknowledge ( message );
}
return null ;
}
});
futures . add ( future );
}
System . out . println ( "Waiting for async ops to complete" );
for ( CompletableFuture < Message > future : futures ) {
//当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
future . join ();
}
System . out . println ( "All operations completed" );
consumer . close ();
client . close ();
}
}