本文使用的Pulsar2.8.0
参考文章
一、上手
1.1 安装
安装基于Centos8.4
下载pulsar
1
| wget https://archive.apache.org/dist/pulsar/pulsar-2.8.0/apache-pulsar-2.8.0-bin.tar.gz
|
将pulsar解压到/opt/module
文件夹下
1
| tar -zxvf apache-pulsar-2.8.0-bin.tar.gz -C /opt/module
|
进入到pulsar路径下,将pulsar单节点启动
会有一大堆乱七八糟的日志,直接不用管了。一般出现下面这样的图时,就是启动成功了。
1.2 搭建项目
创建SpringBoot项目
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> </parent> <groupId>com.example</groupId> <artifactId>pulsar-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>pulsar-demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <pulsar.version>2.8.0</pulsar.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
|
application.yml
1 2 3 4 5 6 7 8
| pulsar: servers: pulsar://192.168.110.39:6650 producer: topic: cccTopic producerName: ccc consumer: topic: cccTopic subscriptionName: CCC
|
创建配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| @Configuration @ConfigurationProperties(prefix = "pulsar") public class PulsarProperties { private String servers; private Producer producer; private Consumer consumer;
public String getServers() { return servers; }
public void setServers(String servers) { this.servers = servers; }
public Producer getProducer() { return producer; }
public void setProducer(Producer producer) { this.producer = producer; }
public Consumer getConsumer() { return consumer; }
public void setConsumer(Consumer consumer) { this.consumer = consumer; }
public static class Producer { private String topic; private String producerName;
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getProducerName() { return producerName; }
public void setProducerName(String producerName) { this.producerName = producerName; } } public static class Consumer { private String topic; private String subscriptionName;
public String getTopic() { return topic; }
public void setTopic(String topic) { this.topic = topic; }
public String getSubscriptionName() { return subscriptionName; }
public void setSubscriptionName(String subscriptionName) { this.subscriptionName = subscriptionName; } }
}
|
单元测试是否连接成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @SpringBootTest class PulsarDemoApplicationTests { @Autowired private PulsarClientConfig config; @Autowired private PulsarProperties pulsarProperties;
@Test void testIsConnected() throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarProperties.getServers()) .build(); Producer<byte[]> producer = client.newProducer() .topic(pulsarProperties.getProducer().getProducerName()) .producerName(pulsarProperties.getProducer().getProducerName()) .create(); boolean connected = producer.isConnected(); if(connected){ System.out.println("已连接"); producer.close(); }else{ System.out.println("未连接"); } } }
|
二、使用
2.1 生产者
权限模式
- Shared:默认设置。多个生产者可以在一个topic上发送消息。
- Exclusive:一个topic只有一个生产者可以发送消息。如果已经有生产者连接该topic,再有其他生产者连接发布消息则会报错。
- WaitForExclusive:如果已经有生产者连接topic,再有生产者连接,则会被挂起,知道获取Exclusive权限
发送模式
- 同步发送:生产者发送消息之后,会等待节点的消息确认。如果没有收到节点的确认,那么发送操作就会被认定为失败。本质上pulsar的同步send也是基于异步sendAsync来实现的。
- 异步发送:生产者将消息丢进内存的队列里,立即返回。本地客户端在后台将消息发往节点。
如何验证服务器到节点的消息确认,可以通过设置sendTimeOut来设置超时时间,经测试sendTimeout(2,TimeUnit.MILLISECONDS)时候,同步就会报错了。而sendAsync不会报错。
2.2 消费者
接收模式
- 同步接收:会一直卡在接收消息的状态,不会往下执行,除非收到了消息。
- 异步接收:会立即返回一个
CompletableFuture
监听器MessageListener
客户端库为消费者提供侦听器实现。 例如,Java 客户端提供了一个 MesssageListener 接口。 在此接口中,每当接收到新消息时,都会调用 received 方法。
消费确认
- Acknowledgement:返回消费成功。当所有的订阅该Topic的消费者,都成功消费之后,这条消息就会被清除。
- Negative Acknowledgement:返回消费失败。如果该条消息通知节点消费失败,节点会在指定时间延迟后,再次发送消息给该消费者。
- Acknowledgement Timeout:消费消息超时。如果在该指定的时间内,未成功消费,节点则会再次发送消息。
消费者订阅模式
pulsar提供了四种订阅模式。订阅Topic的模式是由消费者指定的。
- exclusive:一个Topic只允许一个消费者订阅,其他消费者订阅会报错。
- shared:多个消费者可以共同订阅一个Topic,并且每条消息只发给一个消费者消费。以轮询的方式在消费间发送。
- failover:多个消费者可以共同订阅一个Topic,发送消息时,挑选一个主消费者,将该消息发给主消费者进行消费,如果该消费者挂了,会另外挑选一个主消费者。
- key_shared:多个消费者可以共同订阅一个Topic,并且具有相同key的消息,仅发给特定某个消费者。当有消费者连接或者断开时,key会发生改变。
2.3 Topic
Topic的格式如下
1
| {persistent|non-persistent}://tenant/namespace/topic
|
其中涉及的字段含义:
- persistent|non-persistent:Topic的存储类型。持久存储和非持久存储,持久存储的Topic,所有的消息,都会保存到磁盘上;非持久存储保存在内存中。默认是持久存储persistent。
- tenant:集群中的用户。默认是public
- namespace:用户下的命名空间。默认是default
- topic:Topic的名称
2.4 案例
PulsarClientConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @Configuration public class PulsarClientConfig { @Autowired private PulsarProperties pulsarProperties;
public PulsarClient getClient() throws PulsarClientException { return PulsarClient.builder() .serviceUrl(pulsarProperties.getServers()) .build(); }
public Producer<byte[]> getProducer(PulsarClient client) throws PulsarClientException { return client.newProducer() .topic(pulsarProperties.getProducer().getTopic()) .producerName(pulsarProperties.getProducer().getProducerName()) .sendTimeout(2, TimeUnit.SECONDS) .create(); }
public Consumer getConsumer(PulsarClient client) throws PulsarClientException { return client.newConsumer() .topic(pulsarProperties.getConsumer().getTopic()) .subscriptionName(pulsarProperties.getConsumer().getSubscriptionName()) .negativeAckRedeliveryDelay(5, TimeUnit.SECONDS) .subscribe(); }
public void close(PulsarClient client) { try { client.close(); } catch (PulsarClientException e) { e.printStackTrace(); } }
}
|
需求A
1个生产者发送消息,1个消费者进行监听并消费,用同步发送、同步接收的方式实现。
TestSync.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @SpringBootTest class TestSync { @Autowired private PulsarClientConfig config;
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test void testProducer() throws Exception { PulsarClient client = config.getClient(); Producer<byte[]> producer = config.getProducer(client); for (int i = 0; i < 100; i++) { final String content = "my-SyncSend-message-" + i; producer.send((content).getBytes()); System.out.println("Send message: " + content); } producer.close(); client.close(); }
@Test void testConsumer() throws PulsarClientException { PulsarClient client = config.getClient(); Consumer consumer = config.getConsumer(client); while (true) { Message message = consumer.receive(); try { System.out.printf("Message received: %s%n", new String(message.getData())); consumer.acknowledge(message); } catch (Exception e) { e.printStackTrace(); consumer.negativeAcknowledge(message); } } } }
|
需求B
1个生产者发送消息,1个消费者进行监听并消费,用异步发送、异步接收的方式实现。
TestAsync.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| @SpringBootTest class TestAsync { @Autowired private PulsarClientConfig config;
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@Test void testProducer() throws Exception { List<CompletableFuture<MessageId>> futures = Lists.newArrayList(); PulsarClient client = config.getClient(); Producer<byte[]> producer = config.getProducer(client); for (int i = 0; i < 100; i++) { final String content = "my-AsyncSend-message-" + i; CompletableFuture<MessageId> future = producer.sendAsync(content.getBytes()); future.handle(new BiFunction<MessageId, Throwable, Object>() { @Override public Object apply(MessageId messageId, Throwable throwable) { if (throwable == null) { System.out.println("Message persisted: " + content); } else { System.out.println("Error persisting message: " + content + throwable); } return null; } }); futures.add(future); }
System.out.println("Waiting for async ops to complete"); for (CompletableFuture<MessageId> future : futures) { future.join(); } System.out.println("All operations completed");
producer.close(); client.close(); }
@Test void testConsumer() throws PulsarClientException { PulsarClient client = config.getClient(); Consumer consumer = config.getConsumer(client); List<CompletableFuture<Message>> futures = Lists.newArrayList(); for(int i=0;i<100;i++){ CompletableFuture<Message> future = consumer.receiveAsync(); System.out.println(i); future.handle(new BiFunction<Message, Throwable, Object>() { @Override public Object apply(Message message, Throwable throwable) { if (throwable == null) { System.out.println("Message consumed: " + new String(message.getData())); try { consumer.acknowledge(message); } catch (PulsarClientException e) { e.printStackTrace(); } } else { System.out.println("Error consumed message: " + new String(message.getData()) + throwable); consumer.negativeAcknowledge(message); } return null; } }); futures.add(future);
} System.out.println("Waiting for async ops to complete"); for (CompletableFuture<Message> future : futures) { future.join(); } System.out.println("All operations completed"); consumer.close(); client.close(); } }
|