摘要

在公司跟着做的第二个项目做完了,第三个项目需要用到一些比较潮流的新技术,pulsar就是其中之一。

这段时间上班就是在改完测试提的bug之外,学习。这篇博客记录下pulsar的消息传输案例。

正文

本文使用的Pulsar2.8.0

参考文章

一、上手

1.1 安装

安装基于Centos8.4

下载pulsar

shell
1
wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz

将pulsar解压到/opt/module文件夹下

shell
1
tar -zxvf apache-pulsar-2.8.0-bin.tar.gz -C /opt/module

进入到pulsar路径下,将pulsar单节点启动

shell
1
bin/pulsar standalone

会有一大堆乱七八糟的日志,直接不用管了。一般出现下面这样的图时,就是启动成功了。

3.png

1.2 搭建项目

创建SpringBoot项目

pom.xml

xml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>pulsar-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>pulsar-demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <pulsar.version>2.8.0</pulsar.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

yaml
1
2
3
4
5
6
7
8
pulsar:
  servers: pulsar://192.168.110.39:6650
  producer:
    topic: cccTopic
    producerName: ccc
  consumer:
    topic: cccTopic
    subscriptionName: CCC

创建配置类

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@Configuration
@ConfigurationProperties(prefix = "pulsar")
public class PulsarProperties {
    private String servers;
    private Producer producer;
    private Consumer consumer;



    public String getServers() {
        return servers;
    }

    public void setServers(String servers) {
        this.servers = servers;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

    public static class Producer {
        private String topic;
        private String producerName;

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getProducerName() {
            return producerName;
        }

        public void setProducerName(String producerName) {
            this.producerName = producerName;
        }
    }
    
    public static class Consumer {
        private String topic;
        private String subscriptionName;

        public String getTopic() {
            return topic;
        }

        public void setTopic(String topic) {
            this.topic = topic;
        }

        public String getSubscriptionName() {
            return subscriptionName;
        }

        public void setSubscriptionName(String subscriptionName) {
            this.subscriptionName = subscriptionName;
        }
    }

}

单元测试是否连接成功

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@SpringBootTest
class PulsarDemoApplicationTests {
    @Autowired
    private PulsarClientConfig config;
    @Autowired
    private PulsarProperties pulsarProperties;

    @Test
    void testIsConnected() throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarProperties.getServers())
                .build();
        //这个永远返回true,不管连的地址有木有。所以想看有没有连成功,需要创建个生产者或者消费者。
        //boolean closed = client.isClosed();
        Producer<byte[]> producer = client.newProducer()
                .topic(pulsarProperties.getProducer().getProducerName())
                .producerName(pulsarProperties.getProducer().getProducerName())
                .create();
        boolean connected = producer.isConnected();
        if(connected){
            System.out.println("已连接");
            producer.close();
        }else{
            System.out.println("未连接");
        }
    }
}

1.png

二、使用

2.1 生产者

权限模式

  • Shared:默认设置。多个生产者可以在一个topic上发送消息。
  • Exclusive:一个topic只有一个生产者可以发送消息。如果已经有生产者连接该topic,再有其他生产者连接发布消息则会报错。
  • WaitForExclusive:如果已经有生产者连接topic,再有生产者连接,则会被挂起,知道获取Exclusive权限

发送模式

  • 同步发送:生产者发送消息之后,会等待节点的消息确认。如果没有收到节点的确认,那么发送操作就会被认定为失败。本质上pulsar的同步send也是基于异步sendAsync来实现的。
  • 异步发送:生产者将消息丢进内存的队列里,立即返回。本地客户端在后台将消息发往节点。

如何验证服务器到节点的消息确认,可以通过设置sendTimeOut来设置超时时间,经测试sendTimeout(2,TimeUnit.MILLISECONDS)时候,同步就会报错了。而sendAsync不会报错。

4.png

5.png

2.2 消费者

接收模式

  • 同步接收:会一直卡在接收消息的状态,不会往下执行,除非收到了消息。
  • 异步接收:会立即返回一个CompletableFuture

监听器MessageListener

客户端库为消费者提供侦听器实现。 例如,Java 客户端提供了一个 MesssageListener 接口。 在此接口中,每当接收到新消息时,都会调用 received 方法。

消费确认

  • Acknowledgement:返回消费成功。当所有的订阅该Topic的消费者,都成功消费之后,这条消息就会被清除。
  • Negative Acknowledgement:返回消费失败。如果该条消息通知节点消费失败,节点会在指定时间延迟后,再次发送消息给该消费者。
  • Acknowledgement Timeout:消费消息超时。如果在该指定的时间内,未成功消费,节点则会再次发送消息。

消费者订阅模式

pulsar提供了四种订阅模式。订阅Topic的模式是由消费者指定的。

  • exclusive:一个Topic只允许一个消费者订阅,其他消费者订阅会报错。
  • shared:多个消费者可以共同订阅一个Topic,并且每条消息只发给一个消费者消费。以轮询的方式在消费间发送。
  • failover:多个消费者可以共同订阅一个Topic,发送消息时,挑选一个主消费者,将该消息发给主消费者进行消费,如果该消费者挂了,会另外挑选一个主消费者。
  • key_shared:多个消费者可以共同订阅一个Topic,并且具有相同key的消息,仅发给特定某个消费者。当有消费者连接或者断开时,key会发生改变。

3.png

2.3 Topic

Topic的格式如下

text
1
{persistent|non-persistent}://tenant/namespace/topic

其中涉及的字段含义:

  • persistent|non-persistent:Topic的存储类型。持久存储和非持久存储,持久存储的Topic,所有的消息,都会保存到磁盘上;非持久存储保存在内存中。默认是持久存储persistent。
  • tenant:集群中的用户。默认是public
  • namespace:用户下的命名空间。默认是default
  • topic:Topic的名称

2.4 案例

PulsarClientConfig.java

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Configuration
public class PulsarClientConfig {
    @Autowired
    private PulsarProperties pulsarProperties;

    /**
     * 获取Client
     *
     * @return
     * @throws PulsarClientException
     * @author chenchuancheng
     * @since 2021/9/6 15:16
     */
    public PulsarClient getClient() throws PulsarClientException {
        return PulsarClient.builder()
                .serviceUrl(pulsarProperties.getServers())
                .build();
    }

    /**
     * 获取生产者
     *
     * @param client
     * @return
     * @throws PulsarClientException
     * @author chenchuancheng
     * @since 2021/9/6 15:16
     */
    public Producer<byte[]> getProducer(PulsarClient client) throws PulsarClientException {
        return client.newProducer()
                .topic(pulsarProperties.getProducer().getTopic())
                .producerName(pulsarProperties.getProducer().getProducerName())
                .sendTimeout(2, TimeUnit.SECONDS)
                .create();
    }


    /**
     * 获取消费者
     *
     * @param client
     * @return
     * @throws PulsarClientException
     * @author chenchuancheng
     * @since 2021/9/6 15:15
     */
    public Consumer getConsumer(PulsarClient client) throws PulsarClientException {
        return client.newConsumer()
                .topic(pulsarProperties.getConsumer().getTopic())
                .subscriptionName(pulsarProperties.getConsumer().getSubscriptionName())
                //配置接收失败,pulsar再次发送消息的时间间隔
                .negativeAckRedeliveryDelay(5, TimeUnit.SECONDS)
                .subscribe();
    }

    /**
     * 关闭
     *
     * @param client
     */
    public void close(PulsarClient client) {
        try {
            client.close();
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }

}

需求A

1个生产者发送消息,1个消费者进行监听并消费,用同步发送、同步接收的方式实现。

TestSync.java

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@SpringBootTest
class TestSync {
    @Autowired
    private PulsarClientConfig config;

    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Test
    void testProducer() throws Exception {
        PulsarClient client = config.getClient();
        Producer<byte[]> producer = config.getProducer(client);
        for (int i = 0; i < 100; i++) {
            final String content = "my-SyncSend-message-" + i;
            producer.send((content).getBytes());//同步发送
            System.out.println("Send message: " + content);
        }
        //关闭
        producer.close();
        client.close();
    }

    @Test
    void testConsumer() throws PulsarClientException {
        PulsarClient client = config.getClient();
        Consumer consumer = config.getConsumer(client);
        //消费消息
        while (true) {
            Message message = consumer.receive();
            try {
                System.out.printf("Message received: %s%n", new String(message.getData()));
                consumer.acknowledge(message);
            } catch (Exception e) {
                e.printStackTrace();
                consumer.negativeAcknowledge(message);
            }
        }
    }
}

需求B

1个生产者发送消息,1个消费者进行监听并消费,用异步发送、异步接收的方式实现。

TestAsync.java

java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@SpringBootTest
class TestAsync {
    @Autowired
    private PulsarClientConfig config;

    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Test
    void testProducer() throws Exception {
        List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
        PulsarClient client = config.getClient();
        Producer<byte[]> producer = config.getProducer(client);
        for (int i = 0; i < 100; i++) {
            final String content = "my-AsyncSend-message-" + i;
            //异步发送
            CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes());
            future.handle(new BiFunction<MessageId, Throwable, Object>() {
                @Override
                public Object apply(MessageId messageId, Throwable throwable) {
                    if (throwable == null) {
                        System.out.println("Message persisted: " + content);
                    } else {
                        System.out.println("Error persisting message: " + content + throwable);
                    }
                    return null;
                }
            });
            futures.add(future);
        }

        System.out.println("Waiting for async ops to complete");
        for (CompletableFuture<MessageId> future : futures) {
            //当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
            future.join();
        }
        System.out.println("All operations completed");

        //关闭
        producer.close();
        client.close();
    }

    @Test
    void testConsumer() throws PulsarClientException {
        PulsarClient client = config.getClient();
        Consumer consumer = config.getConsumer(client);
        List<CompletableFuture<Message>> futures = Lists.newArrayList();
        //消费消息
        for(int i=0;i<100;i++){
            CompletableFuture<Message> future = consumer.receiveAsync();
            System.out.println(i);
            future.handle(new BiFunction<Message, Throwable, Object>() {
                @Override
                public Object apply(Message message, Throwable throwable) {
                    if (throwable == null) {
                        System.out.println("Message consumed: " + new String(message.getData()));
                        try {
                            consumer.acknowledge(message);
                        } catch (PulsarClientException e) {
                            e.printStackTrace();
                        }
                    } else {
                        System.out.println("Error consumed message: " + new String(message.getData()) + throwable);
                        consumer.negativeAcknowledge(message);
                    }
                    return null;
                }
            });
            futures.add(future);

        }
        System.out.println("Waiting for async ops to complete");
        for (CompletableFuture<Message> future : futures) {
            //当完成时返回结果值,如果异常完成则抛出(未检查的)异常。
            future.join();
        }
        System.out.println("All operations completed");
        consumer.close();
        client.close();
    }
}