Pulsar⽣产和消费Java实战
前⾔
1.⽣产数据
import Slf4j;
import*;
import CompletableFuture;
import TimeUnit;
@Slf4j
public class PulsarProducer {
/**
* pulsar 客户端
*/
private PulsarClient pulsarClient;
/**
* ⽣产者
*/
书籍封面设计private Producer<byte[]> producer;
/**
* 消息发送服务器地址
*/
萝卜水的功效
private String localClusterUrl ="pulsar://:29095";
public PulsarProducer(PulsarClient pulsarClient, Producer<byte[]> producer){
this.pulsarClient = pulsarClient;口罩多久换一次
this.producer = producer;
}
public PulsarProducer(){
}
public PulsarProducer(String topicName){
initPulsarClientConnection(topicName);
}
/**
* 初始化pulsar的客户端连接
*
* @param topic
*/
private void initPulsarClientConnection(String topic){
try{
pulsarClient = PulsarClient
.builder()
.rviceUrl(localClusterUrl)
.build();
//创建producer
producer = wProducer()
.topic(topic)
/
/ 是否开启批量处理消息,默认true,需要注意的是enableBatching只在异步发送ndAsync⽣效,同步发送nd失效。因此建议⽣产环境若想使⽤批处理,则需使⽤异步发送,或者多线程同步发送
.enableBatching(true)
// 消息压缩(四种压缩⽅式:LZ4,ZLIB,ZSTD,SNAPPY),consumer端不⽤做改动就能消费,开启后⼤约可以降低3/4带宽消耗和存储(官⽅测试)
.compressionType(CompressionType.LZ4)
// 设置将对发送的消息进⾏批处理的时间段,10ms;可以理解为若该时间段内批处理成功,则⼀个batch中的消息数量不会被该参数所影响。
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
// 设置发送超时0s;如果在ndTimeout过期之前服务器没有确认消息,则会发⽣错误。默认30s,设置为0代表⽆限制,建议配置为0
.ndTimeout(0, TimeUnit.SECONDS)
// 批处理中允许的最⼤消息数。默认1000
// 批处理中允许的最⼤消息数。默认1000
.batchingMaxMessages(1000)
// 设置等待接受来⾃broker确认消息的队列的最⼤⼤⼩,默认1000
.maxPendingMessages(1000)
// 设置当消息队列中等待的消息已满时,Producer.nd 和 Producer.ndAsync 是否应该block阻塞。默认为fal,达到maxPendingMessages 后nd操作会报错,设置为true后,nd操作阻塞但是不报错。建议设置为true
.blockIfQueueFull(true)
张玉奇// 向不同partition分发消息的切换频率,默认10ms,可根据batch情况灵活调整
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
// key_Shared模式要⽤KEY_BASED,才能保证同⼀个key的message在⼀个batch⾥
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();
}catch(PulsarClientException e){
log.info("failed to create pulsar connection :{}", e);
}
}
/**
* 消息发送到pulsar
*
* @param data 数据
* @throws PulsarClientException
*/
public void ndMessage(String data)throws PulsarClientException {
// 同步消息发送
syncSend(producer, data);
// 异步消息发送
// asyncSend(producer, data);
}馄饨馅的最佳配方
/**
* 异步发送消息
*
* @param producer ⽣产者
* @param data 数据
*/
public void asyncSend(Producer producer, String data){
// 异步发送
CompletableFuture<MessageId> future = producer.Bytes());
// future 执⾏完成后会将执⾏结果和执⾏过程中抛出的异常传⼊回调⽅法,如果是正常执⾏的则传⼊的异常为null
future.handle((v, exception)->{
if(exception ==null){
log.info("asynchronous push message is successful");
}el{
log.info("asynchronous push failed, error message is "+ exception);
}
return null;
});
}
/**
* 同步发送消息
*
* @param producer ⽣产者
* @param data 要传输的数据
* @throws PulsarClientException
*/
public void syncSend(Producer producer, String data)throws PulsarClientException {
/
/ 同步发送
producer.Bytes());
}
/**
狮子座男人
* 关闭链接
*/
public void cloConnection(){
try{
// 关闭producer
producer.clo();
// 关闭client
pulsarClient.clo();
}catch(PulsarClientException e){
throw new RuntimeException(e);
}
}
}
2.消费者
平板排行榜import Consumer;
import Message;
import PulsarClient;
import SubscriptionType;
import TimeUnit;
public class PulsarConsumer {
public static void main(String[] args)throws Exception {
String localClientUrl ="pulsar://:29095";
// String localClientUrl = "pulsar://10.26.114.120:6650";
// 需要订阅的topic name
String topicName ="persistent://public/default/hsh5-topic";
// 订阅名
String subscriptionName ="my-sub";
consumerPulsarInfo(localClientUrl, topicName, subscriptionName);
}
/
**
* 消费数据
*
* @param localClientUrl 消费的主机地址
* @param topicName 主题
* @param subscriptionName 订阅名称
* @throws Exception
*/
public static void consumerPulsarInfo(String localClientUrl, String topicName, String subscriptionName)throws Exception { // 构造Pulsar client
PulsarClient pulsarClient = PulsarClient.builder().rviceUrl(localClientUrl).build();
//创建consumer
十九八七Consumer consumer = wConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
// 指定消费模式,包含:Exclusive,Failover,Shared,Key_Shared。默认Exclusive模式
.subscriptionType(SubscriptionType.Exclusive)
// 指定从哪⾥开始消费还有Latest,valueof可选,默认Latest
// .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
// 指定消费失败后延迟多久broker重新发送消息给consumer,默认60s
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
.subscribe();
//消费消息
while(true){
Message message = ive();
try{
System.out.printf("Message received: %s%n",new Data()));
consumer.acknowledge(message);
}catch(Exception e){
e.printStackTrace();
}
}
}
}