kafka消息异步处理

更新时间:2023-07-16 22:10:14 阅读: 评论:0

kafka消息异步处理
实现平台间的数据联⽹,利⽤kafka传递消息,考虑到平台内可能有多个项⽬会进⾏数据推送,为了各项⽬间推送的消息进⾏数据处理不会受到彼此间的阻塞影响,同时保证消息的消费速度,因此需要各项⽬间独⽴异步批量的处理数据。
本地队列处理
将从kafka监听到的消息放⼊本地队列中,保证每个项⽬拥有⾃⼰的队列,细分还可以保证每个项⽬每个数据类型的消息拥有⾃⼰的队列,然后让对应的线程去取并异步处理该队列中的数据。
kafka消息监听:
//从kafka中获取消息
@KafkaListener(topics = {"#{'${pic}'.split(',')}"},containerFactory = "dataKafkaListenerContainerFactory")
public void handleData(List<String> messages){
for(String message:messages){
KafkaMessage kafkaMessage = JSONObject.parObject(message, KafkaMessage.class);
if(Objects.isNull(kafkaMessage)){
logger.info("kafka handler处理消息不存在"+message);
continue;
}
}
根据本地缓存的项⽬信息创建不同的队列,并将消息放⼊对应的线程中进⾏处理:
//从本地缓存中获取该项⽬队列
BlockingQueue<KafkaMessage> dataQueue = DATA_(ProjectId());
//缓存中没有则创建新的队列启动新的线程
if(dataQueue==null){
dataQueue = new LinkedBlockingQueue<>();
DATA_MESSAGE_MAP.ProjectId(),dataQueue);
MessageHander messageHander = (Bean("messageHander");
七佛ateDataBlockingQueue(dataQueue);
new Thread(messageHander,"data-hander-"+ProjectId()).start();
}
狐狸简笔画dataQueue.put(kafkaMessage);
线程处理类,如果单线程处理某类数据时,该数据量过⼤,还可以在线程内部启动多线程处理:
public class EventMessageHander implements Runnable {
//创建⼀个消息处理队列
public  BlockingQueue<KafkaMessage> EVENT_BLOCKING_QUEUE;
public void createEventBlockingQueue(BlockingQueue<KafkaMessage> queue) {
EVENT_BLOCKING_QUEUE = queue;
}
/** 是否结束线程 */方面的英文
private Boolean isClo = fal;
//线程专⽤的线程池
private ThreadPoolExecutor executor = new ThreadPoolExecutor(8, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000),
new ThreadFactory() {
final AtomicInteger threadSeq = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
String threadName = "hand-event-thread-" + AndIncrement();
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, threadName);
t.tDaemon(true);
t.tUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
<("thread {} error", t.getName(), e);
}
});
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
//线程处理数据
@Override
public void run() {
while (!isClo) {
try {
Runnable runnable = new Runnable() {赤链华游蛇
@Override
public void run() {
KafkaMessage kafkaMessage;
try {
kafkaMessage = EVENT_BLOCKING_QUEUE.take();
什么是假分数//数据具体处理
} catch (InterruptedException e) {
<("内部线程出错",e);
}
}
};
executor.submit(runnable);
} catch (Exception e) {
<("event handler occur exception!",e);
}
}
logger.info( "process handler event thread end");
}
该实现机制在获取到kafka消息后,将消息存到本地阻塞队列ArrayBlockingQueue中,⼀类消息拥有⾃⼰的队列,让对应的线程去取并处理该阻塞队列中的消息;⼀⽅⾯可以尽快的消费kafka的消息,防⽌消费者⽆法跟上数据⽣成的速度;另⼀⽅⾯容易扩展,具体的消息消费类实现通⽤⽅法,实现⽅法的具体逻辑即可在新线程中异步执⾏消费,不需要在具体的消费类中关注是否开启新线程执⾏。不过本地队列存储有个缺陷,如果队列中堆积着数据,如果服务挂掉的话,队列中的数据就会丢失。
kafka队列处理
将kafka从监听到的消息继续分类放⼊kafka中,利⽤kafka的topic进⾏消息分类,同时kafka本⾝⽀持对消息的批量处理,来保证数据的快速处理。
吸油烟机怎么清洗
可根据消息类型创建不同的topic,再进⾏消息推送
//根据不同的消息类型将消息推往不同的topic
if(MessageType.Value().MessageType())){
KafkaSendUtil.nd("event_handle",JSONString(kafkaMessage));
}
kafka消息并发处理机制,kafka创建topick的时候,可通过分配多个分区(等同于多个线程)对同⼀个topic的消息进⾏并发处
理,topic创建⽅式:
@Bean //创建⼀个kafka管理类,没有此bean⽆法⾃定义的使⽤adminClient创建topic
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = new HashMap<>();
//配置Kafka实例的连接地址
//kafka的地址,不是zookeeper
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, rvers);
KafkaAdmin admin = new KafkaAdmin(props);
return admin;苏非
}
@Bean  //kafka客户端,在spring中创建这个bean之后可以注⼊并且创建topic,⽤于集群环境,创建多个副本
public AdminClient adminClient() {
ate(kafkaAdmin().getConfig());
}
@Bean//通过bean创建(bean的名字为initialTopic)
public NewTopic initialTopic() {
return new NewTopic("event_handle",40, (short) 1 );
}
同时在消费者配置⾥,⼯⼚配置的时候需要配置并发数据,且并发数量最好不要⼤于分区数
@Bean
大便黑绿色public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> eventKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.tConsumerFactory(consumerEventFactory());
//并发数量,不⼤于40,最好同于分区数
factory.tConcurrency(40);
//批量获取
factory.tBatchListener(true);
return factory;
}
监听消息,并发处理:
@KafkaListener(topics = "event_handle",containerFactory = "eventKafkaListenerContainerFactory")
public void handleEvent(List<String> messages){
//消息处理
}
利⽤kafka进⾏消息批量处理,同时还能有效保证数据不会因为服务挂断⽽丢失,但是就⽬前项⽬,⽆法动态根据项⽬进⾏消费,根据不同的项⽬可创建不同的topic进⾏消息推送,消息监听⽅法不使⽤注解动态创建的话,⽆法使⽤分区概念进⾏批量处理。

本文发布于:2023-07-16 22:10:14,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1100342.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   处理   队列   数据   线程   创建
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图