1、引入依赖
<dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> </dependency>
2、设置yml文件
spring: application: name: demo kafka: bootstrap-rvers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生产者 retries: 0 # 重试次数 acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 16384 # 批量大小 buffer-memory: 33554432 # 生产端缓冲区大小 key-rializer: org.apache.kafka.common.rialization.stringrializer# value-rializer: com.itheima.demo.config.myrializer value-rializer: org.apache.kafka.common.rialization.stringrializer consumer: # consumer消费者 group-id: javagroup # 默认的消费组id enable-auto-commit: true # 是否自动提交offt auto-commit-interval: 100 # 提交offt延时(接收到消息后多久提交offt) # earliest:当各分区下有已提交的offt时,从提交的offt开始消费;无提交的offt时,从头开始消费 # latest:当各分区下有已提交的offt时,从提交的offt开始消费;无提交的offt时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offt时,从offt后开始消费;只要有一个分区不存在已提交的offt,则抛出异常 auto-offt-ret: latest key-derializer: org.apache.kafka.common.rialization.stringderializer# value-derializer: com.itheima.demo.config.myderializer value-derializer: org.apache.kafka.common.rialization.stringderializer
3、启动项目
kafkatemplate调用nd时默认采用异步发送,如果需要同步获取发送结果,调用get方法
异步发送生产者:
@restcontrollerpublic class kafkaproducer { @resource private kafkatemplate<string, object> kafkatemplate; @getmapping("/kafka/test/{msg}") public void ndmessage(@pathvariable("msg") string msg) { message message = new message(); message.tmessage(msg); kafkatemplate.nd("test", json.tojsonstring(message)); }}
同步发送生产者:
//测试同步发送与监听@restcontrollerpublic class asyncproducer { private final static logger logger = loggerfactory.getlogger(asyncproducer.class); @resource private kafkatemplate<string, object> kafkatemplate; //同步发送 @getmapping("/kafka/sync/{msg}") public void sync(@pathvariable("msg") string msg) throws exception { message message = new message(); message.tmessage(msg); listenablefuture<ndresult<string, object>> future = kafkatemplate.nd("test", json.tojsonstring(message)); //注意,可以设置等待时间,超出后,不再等候结果 ndresult<string, object> result = future.get(3,timeunit.conds); logger.info("nd result:{}",result.getproducerrecord().value()); }}
消费者:
@componentpublic class kafkaconsumer { private final logger logger = loggerfactory.getlogger(kafkaconsumer.class); //不指定group,默认取yml里配置的 @kafkalistener(topics = {"test"}) public void onmessage1(consumerrecord<?, ?> consumerrecord) { optional<?> optional = optional.ofnullable(consumerrecord.value()); if (optional.isprent()) { object msg = optional.get(); logger.info("message:{}", msg); } }}
那么我们怎么看出来同步发送和异步发送的区别呢?
①首先在服务器上,将kafka暂停服务。
②在swagger发送消息
调同步发送:请求被阻断,一直等待,超时后返回错误
而调异步发送的(默认发送接口),请求立刻返回。
那么,异步发送的消息怎么确认发送情况呢?
我们使用注册监听
即新建一个类:kafkalistener.java
@configurationpublic class kafkalistener { private final static logger logger = loggerfactory.getlogger(kafkalistener.class); @autowired kafkatemplate kafkatemplate; //配置监听 @postconstruct private void listener() { kafkatemplate.tproducerlistener(new producerlistener<string, object>() { @override public void onsuccess(producerrecord<string, object> producerrecord, recordmetadata recordmetadata) { logger.info("ok,message={}", producerrecord.value()); } public void onerror(producerrecord<string, object> producerrecord, exception exception) { logger.error("error!message={}", producerrecord.value()); }); }}
查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener
如果是正常发送异步消息,则会获得该消息。可以看到,在内部类 kafkalistener$1 中,即注册的listener的消息。
消费者使用:kafkaconsumer.java
@componentpublic class kafkaconsumer { private final logger logger = loggerfactory.getlogger(kafkaconsumer.class); //不指定group,默认取yml里配置的 @kafkalistener(topics = {"test"}) public void onmessage1(consumerrecord<?, ?> consumerrecord) { optional<?> optional = optional.ofnullable(consumerrecord.value()); if (optional.isprent()) { object msg = optional.get(); logger.info("message:{}", msg); } }}
1)序列化详解
前面用到的是kafka自带的字符串序列化器(org.apache.kafka.common.rialization.stringrializer
)除此之外还有:bytearray、bytebuffer、bytes、double、integer、long 等这些序列化器都实现了接口(org.apache.kafka.common.rialization.rializer
)基本上,可以满足绝大多数场景2)自定义序列化
自己实现,实现对应的接口即可,有以下方法:
public interface rializer<t> extends cloable {default void configure(map<string, ?> configs, boolean iskey) {}//理论上,只实现这个即可正常运行byte[] rialize(string var1, t var2);//默认调上面的方法default byte[] rialize(string topic, headers headers, t data) {return this.rialize(topic, data);}default void clo() {}}
我们来自己实现一个序列化器:myrializer.java
public class myrializer implements rializer { @override public byte[] rialize(string s, object o) { string json = json.tojsonstring(o); return json.getbytes(); }}
3)解码myderializer.java
,实现方式与编码器几乎一样.
public class myderializer implements derializer { private final static logger logger = loggerfactory.getlogger(myderializer.class); @override public object derialize(string s, byte[] bytes) { try { string json = new string(bytes,"utf-8"); return json.par(json); } catch (unsupportedencodingexception e) 生日祝福语格式{ e.printstacktrace(); } return null; }}
4)在yaml中配置自己的编码器、解码器
再次收发,消息正常
分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。
给定了分区号,直接将数据发送到指定的分区里面去没有给定分区号,给定数据的key值,通过key取上hashcode进行分区既没有给定分区号,也没有给定key值,直接轮循进行分区(默认)自定义分区,你想怎么做就怎么做1)验证默认分区规则
发送者代码参考:partitionproducer.java
//测试分区发送@restcont拼音翻译rollerpublic class partitionproducer { @resource private kafkatemplate<string, object> kafkatemplate; // 指定分区发送// 不管你key是什么,到同一个分区 @getmapping("/kafka/partitionnd/{key}") public void tpartition(@pathvariable("key") string key) { kafkatemplate.nd("test", 0, key, "key=" + key + ",msg=指定0号分区"); } // 指定key发送,不指定分区// 根据key做hash,相同的key到同一个分区 @getmapping("/kafka/keynd/{key}") public void tkey(@pathvariable("key") string key) { kafkatemplate.nd("test", key, "key=" + key + ",msg=不指定分区");}
消费者代码使用:partitionconsumer.java
@componentpublic class partitionconsumer { private final logger logger = loggerfactory.getlogger(partitionconsumer.class); //分区消费 @kafkalistener(topics = {"test"},topicpattern = "0") public void onmessage(consumerrecord<?, ?> consumerrecord) { optional<?> optional = optional.ofnullable(consumerrecord.value()); if (optional.isprent()) { object msg = optional.get(); logger.info("partition=0,message:[{}]", msg); } } @kafkalistener(topics = {"test"},topicpattern = "1") public void onmessage1(consumerrecord<?, ?> consumerrecord) { optional<?> optional = optional.ofnullable(consumerrecord.value()); if (optional.isprent()) { object msg = optional.get(); logger.info("partition=1,message:[{}]", msg); } }}
通过swagger访问tkey(也就是只给了key的方法):
可以看到key相同的被hash到了同一个分区
再访问tpartition来设置分区号0来发送:
可以看到无论key是什么,都是分区0来消费
2)自定义分区
参考代码:mypartitioner.java , mypartitiontemplate.java。
发送使用:mypartitionproducer.java。
public class mypartitioner implements partitioner { @override public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {// 定义自己的分区策略// 如果key以0开头,发到0号分区// 其他都扔到1号分区 string keystr = key+""; if (keystr.startswith("0")){ return 0; }el { return 1; } } public void clo() { public void configure(map<string, ?> map) {}
@configurationpublic class mypartitiontemplate { private final logger logger = loggerfactory.getlogger(this.getclass()); @value("${spring.kafka.bootstrap-rvers}") private string bootstraprvers; kafkatemplate kafkatemplate; @postconstruct public void tkafkatemplate() { map<string, object> props = new hashmap<>(); props.put(producerconfig.bootstrap_rvers_config, bootstraprvers); props.put(producerconfig.key_rializer_class_config, stringrializer.class); props.put(producerconfig.value_rializer_class_config, stringrializer.class); //注意分区器在这里!!! props.put(producerconfig.partitioner_class_config, mypartitioner.class); this.kafkatemplate = new kafkatemplate<string, string>(new defaultkafkaproducerfactory<>(props)); } public kafkatemplate getkafkatemplate(){ return kafkatemplate;}
//测试自定义分区发送@restcontrollerpublic class mypartitionproducer { @autowired mypartitiontemplate template;// 使用0开头和其他任意字母开头的key发送消息// 看控制台的输出,在哪个分区里? @getmapping("/kafka/mypartitionnd/{key}") public void tpartition(@pathvariable("key") string key) { template.getkafkatemplate().nd("test", key,"key="+key+",msg=自定义分区策略"); }}
使用swagger,发送0开头和非0开头两种key
发送者使用:kafkaproducer.java
@restcontrollerpublic class kafkaproducer { @resource private kafkatemplate<string, object> kafkatemplate; @getmapping("/kafka/test/{msg}") public void ndmessage(@pathvariable("msg") string msg) { message message = new message(); message.tmessage(msg); kafkatemplate.nd("test", json.tojsonstring(message)); }}
1)代码参考:groupconsumer.java,listener拷贝3份,分别赋予两组group,验证分组消费:
//测试组消费@componentpublic class groupconsumer { private final logger logger = loggerfactory.getlogger(groupconsumer.class); //组1,消费者1 @kafkalistener(topics = {"test"},groupid = "group1") public void onmessage1(consumerrecord<?, ?顾漫小说> consumerrecord) { optional<?> optional = optional.ofnullable(consumerrecord.value()); if (optional.isprent()) { object msg = optional.get(); logger.info("group:group1-1 , message:{}", msg); } } //组1,消费者2 public void onmessage2(consumerrecord<?, ?> consumerrecord) { logger.info("group:group1-2 , message:{}", msg); //组2,只有一个消费者 @kafkalistener(topics = {"test"},groupid = "group2") public void onmessage3(consumerrecord<?, ?> consumerrecord) { logger.info("group:group2 , message:{}", msg);}
2)启动
3)通过swagger发送2条消息
同一group下的两个消费者,在group1均分消息group2下只有一个消费者,得到全部消息
4)消费端闲置
注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置(因为一个分区只能分配给一个消费者),浪费资源!
验证方式:
停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。
重新发送两条消息,试一试
group2可以消费到1、2两条消息group1下有两个消费者,但是只分配给了 1 , 2这个进程被闲置
1)自动提交
前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交
enable-auto-commit: true # 是否自动提交offtauto-commit-interval: 100 # 提交offt延时(接收到消息后多久提交offt,默认单位为ms)
2)手动提交
有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。
下面我们自己定义配置,覆盖上面的参数
代码参考:myofftconfig.java
@configurationpublic class myofftconfig { private final logger logger = loggerfactory.getlogger(this.getclass()); @value("${spring.kafka.bootstrap-rvers}") private string bootstraprvers; @bean public kafkalistenercontainerfactory<?> manualkafkalistenercontainerfactory() { map<string, object> configprops = new hashmap<>(); configprops.put(consumerconfig.bootstrap_rvers_config, bootstraprvers); configprops.put(consumerconfig.key_derializer_class_config, stringderializer.class); configprops.put(consumerconfig.value_derializer_class_config, stringderializer.class); // 注意这里!!!设置手动提交 configprops.put(consumerconfig.enable_auto_commit_config, "fal"); concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>(); factory.tconsumerfactory(new defaultkafkaconsumerfactory<>(configprops)); // ack模式: // ackmode针对enable_auto_commit_config=fal时生腐生效,有以下几种: // // record // 每处理一条commit一次 // batch(默认) // 每次poll的时候批量提交一次,频率取决于每次poll的调用频率 // time // 每次间隔acktime的时间去commit(跟auto commit interval有什么区别呢?) // count // 累积达到ackcount次的ack去commit // count_time // acktime或ackcount哪个条件先满足,就commit // manual // listener负责ack,但是背后也是批量上去 // manual_immediate // listner负责ack,每调用一次,就立即commit factory.getcontainerproperties().tackmode(containerproperties.ackmode.manual_immediate); return factory; }}
然后通过在消费端的consumer来提交偏移量
myofftconsumer:
@componentpublic class myofftconsumer { private final logger logger = loggerfactory.getlogger(this.getclass()); @kafkalistener(topics = "test", groupid = "myofft-group-1", containerfactory = "manualkafkalistenercontainerfactory") public void manualcommit(@payload string message, @header(kafkaheaders.received_partition_id) int partition, @header(kafkaheaders.received_topic) string topic, consumer consumer, acknowledgment ack) { logger.info("手动提交偏移量 , partition={}, msg={}", partition, message); // 同步提交 consumer.commitsync(); //异步提交 //consumer.commitasync(); // ack提交也可以,会按设置的ack策略走(参考myofftconfig.java里的ack模式) // ack.acknowledge(); } @kafkalistener(topics = "test", groupid = "myofft-group-2", containerfactory = "manualkafkalistenercontainerfactory") public void nocommit(@payload string message, @header(kafkaheaders.received_partition_id) int partition, @header(kafkaheaders.received_topic) string topic, consumer consumer, acknowledgment ack) { logger.info("忘记提交偏移量, partition={}, msg={}", partition, message); // 不做commit! /** * 现实状况: * commitsync和commitasync组合使用 * <p> * 手工提交异步 consumer.commitasync(); * 手工同步提交 consumer.commitsync() * commitsync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前, * commitsync()会一直重试,但是commitasync()不会。 * 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题 * 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。 * 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费 * 因此,在消费者关闭前一般会组合使用commitasync()和commitsync()。 */// @kafkalistener(topics = "test", groupid = "myofft-group-3",containerfactory = "manualkafkalistenercontainerfactory") public void manualofft(@payload string message, try { logger.info("同步异步搭配 , partition={}, msg={}", partition, message); //先异步提交 consumer.commitasync(); //继续做别的事 } catch (exception e) { system.out.println("commit failed"); } finally { try { consumer.commitsync(); } finally { consumer.clo(); } } * 甚至可以手动提交,指定任意位置的偏移量 * 不推荐日常使用!!!// @kafkalistener(topics = "test", groupid = "myofft-group-4",containerfactory = "manualkafkalistenercontainerfactory") public void offt(consumerrecord record, consumer consumer) { logger.info("手动指定任意偏移量, partition={}, msg={}", record.partition(), record); map<topicpartition, offtandmetadata> currentofft = new hashmap<>(); currentofft.put(new topicpartition(record.topic(), record.partition()), new offtandmetadata(爱的奉献作文record.offt() + 1)); consumer.commitsync(currentofft);}
3)重复消费问题
如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!
用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他consumer的component注解,只保留当前myofftconsumer.java 启动项目,使用swagger的kafkaproducer发送连续几条消息 留心控制台,都能消费,没问题:
但是!重启项目:
无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!
再通过命令行查询偏移量
4)经验与总结
commitsync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitsync()会一直重试,但是commitasync()不会。
这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。
但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!
因此,在消费者关闭前一般会组合使用commitasync()和commitsync()。
详细代码参考:myofftconsumer.manualofft()
到此这篇关于springboot整合kafka的文章就介绍到这了,更多相关springboot整合kafka内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-05 00:01:45,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/b9be613fec162da90cf564a14593041b.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Springboot整合kafka的示例代码.doc
本文 PDF 下载地址:Springboot整合kafka的示例代码.pdf
留言与评论(共有 0 条评论) |