首页 > 作文

Springboot整合kafka的示例代码

更新时间:2023-04-05 00:01:48 阅读: 评论:0

目录
1. 整合kafka2. 消息发送2.1 发送类型2.2 序列化2.3 分区策略3. 消息消费3.1 消息组别3.2 位移提交

1. 整合kafka

1、引入依赖

 <dependency>            <groupid>org.springframework.kafka</groupid>            <artifactid>spring-kafka</artifactid>        </dependency>

2、设置yml文件

spring:  application:    name: demo  kafka:    bootstrap-rvers: 52.82.98.209:10903,52.82.98.209:10904    producer: # producer 生产者      retries: 0 # 重试次数      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)      batch-size: 16384 # 批量大小      buffer-memory: 33554432 # 生产端缓冲区大小      key-rializer: org.apache.kafka.common.rialization.stringrializer#      value-rializer: com.itheima.demo.config.myrializer      value-rializer: org.apache.kafka.common.rialization.stringrializer    consumer: # consumer消费者      group-id: javagroup # 默认的消费组id      enable-auto-commit: true # 是否自动提交offt      auto-commit-interval: 100  # 提交offt延时(接收到消息后多久提交offt)      # earliest:当各分区下有已提交的offt时,从提交的offt开始消费;无提交的offt时,从头开始消费      # latest:当各分区下有已提交的offt时,从提交的offt开始消费;无提交的offt时,消费新产生的该分区下的数据      # none:topic各分区都存在已提交的offt时,从offt后开始消费;只要有一个分区不存在已提交的offt,则抛出异常      auto-offt-ret: latest      key-derializer: org.apache.kafka.common.rialization.stringderializer#      value-derializer: com.itheima.demo.config.myderializer      value-derializer: org.apache.kafka.common.rialization.stringderializer

3、启动项目

2. 消息发送

2.1 发送类型

kafkatemplate调用nd时默认采用异步发送,如果需要同步获取发送结果,调用get方法

异步发送生产者:

@restcontrollerpublic class kafkaproducer {    @resource    private kafkatemplate<string, object> kafkatemplate;    @getmapping("/kafka/test/{msg}")    public void ndmessage(@pathvariable("msg") string msg) {        message message = new message();        message.tmessage(msg);        kafkatemplate.nd("test", json.tojsonstring(message));    }}

同步发送生产者:

//测试同步发送与监听@restcontrollerpublic class asyncproducer {    private final static logger logger = loggerfactory.getlogger(asyncproducer.class);    @resource    private kafkatemplate<string, object> kafkatemplate;    //同步发送    @getmapping("/kafka/sync/{msg}")    public void sync(@pathvariable("msg") string msg) throws exception {        message message = new message();        message.tmessage(msg);        listenablefuture<ndresult<string, object>> future = kafkatemplate.nd("test", json.tojsonstring(message));        //注意,可以设置等待时间,超出后,不再等候结果        ndresult<string, object> result = future.get(3,timeunit.conds);        logger.info("nd result:{}",result.getproducerrecord().value());    }}

消费者:

@componentpublic class kafkaconsumer {    private final logger logger = loggerfactory.getlogger(kafkaconsumer.class);    //不指定group,默认取yml里配置的    @kafkalistener(topics = {"test"})    public void onmessage1(consumerrecord<?, ?> consumerrecord) {        optional<?> optional = optional.ofnullable(consumerrecord.value());        if (optional.isprent()) {            object msg = optional.get();            logger.info("message:{}", msg);        }    }}

那么我们怎么看出来同步发送和异步发送的区别呢?

①首先在服务器上,将kafka暂停服务。
②在swagger发送消息

调同步发送:请求被阻断,一直等待,超时后返回错误

而调异步发送的(默认发送接口),请求立刻返回。

那么,异步发送的消息怎么确认发送情况呢?
我们使用注册监听
即新建一个类:kafkalistener.java

@configurationpublic class kafkalistener {    private final static logger logger = loggerfactory.getlogger(kafkalistener.class);    @autowired    kafkatemplate kafkatemplate;    //配置监听    @postconstruct    private void listener() {        kafkatemplate.tproducerlistener(new producerlistener<string, object>() {            @override            public void onsuccess(producerrecord<string, object> producerrecord, recordmetadata recordmetadata) {                logger.info("ok,message={}", producerrecord.value());            }            public void onerror(producerrecord<string, object> producerrecord, exception exception) {                logger.error("error!message={}", producerrecord.value());        });    }}

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener

如果是正常发送异步消息,则会获得该消息。可以看到,在内部类 kafkalistener$1 中,即注册的listener的消息。

2.2 序列化

消费者使用:kafkaconsumer.java

@componentpublic class kafkaconsumer {    private final logger logger = loggerfactory.getlogger(kafkaconsumer.class);    //不指定group,默认取yml里配置的    @kafkalistener(topics = {"test"})    public void onmessage1(consumerrecord<?, ?> consumerrecord) {        optional<?> optional = optional.ofnullable(consumerrecord.value());        if (optional.isprent()) {            object msg = optional.get();            logger.info("message:{}", msg);        }    }}

1)序列化详解

前面用到的是kafka自带的字符串序列化器(org.apache.kafka.common.rialization.stringrializer)除此之外还有:bytearray、bytebuffer、bytes、double、integer、long 等这些序列化器都实现了接口(org.apache.kafka.common.rialization.rializer)基本上,可以满足绝大多数场景

2)自定义序列化
自己实现,实现对应的接口即可,有以下方法:

public interface rializer<t> extends cloable {default void configure(map<string, ?> configs, boolean iskey) {}//理论上,只实现这个即可正常运行byte[] rialize(string var1, t var2);//默认调上面的方法default byte[] rialize(string topic, headers headers, t data) {return this.rialize(topic, data);}default void clo() {}}

我们来自己实现一个序列化器:myrializer.java

public class myrializer implements rializer {    @override    public byte[] rialize(string s, object o) {        string json = json.tojsonstring(o);        return json.getbytes();    }}

3)解码
myderializer.java,实现方式与编码器几乎一样.

public class myderializer implements derializer {    private final static logger logger = loggerfactory.getlogger(myderializer.class);    @override    public object derialize(string s, byte[] bytes) {        try {            string json = new string(bytes,"utf-8");            return json.par(json);        } catch (unsupportedencodingexception e) 生日祝福语格式{            e.printstacktrace();        }        return null;    }}

4)在yaml中配置自己的编码器、解码器

再次收发,消息正常

2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

给定了分区号,直接将数据发送到指定的分区里面去没有给定分区号,给定数据的key值,通过key取上hashcode进行分区既没有给定分区号,也没有给定key值,直接轮循进行分区(默认)自定义分区,你想怎么做就怎么做

1)验证默认分区规则
发送者代码参考:partitionproducer.java

//测试分区发送@restcont拼音翻译rollerpublic class partitionproducer {    @resource    private kafkatemplate<string, object> kafkatemplate;    //    指定分区发送//    不管你key是什么,到同一个分区    @getmapping("/kafka/partitionnd/{key}")    public void tpartition(@pathvariable("key") string key) {        kafkatemplate.nd("test", 0, key, "key=" + key + ",msg=指定0号分区");    }    //    指定key发送,不指定分区//    根据key做hash,相同的key到同一个分区    @getmapping("/kafka/keynd/{key}")    public void tkey(@pathvariable("key") string key) {        kafkatemplate.nd("test", key, "key=" + key + ",msg=不指定分区");}

消费者代码使用:partitionconsumer.java

@componentpublic class partitionconsumer {    private final logger logger = loggerfactory.getlogger(partitionconsumer.class);    //分区消费    @kafkalistener(topics = {"test"},topicpattern = "0")    public void onmessage(consumerrecord<?, ?> consumerrecord) {        optional<?> optional = optional.ofnullable(consumerrecord.value());        if (optional.isprent()) {            object msg = optional.get();            logger.info("partition=0,message:[{}]", msg);        }    }    @kafkalistener(topics = {"test"},topicpattern = "1")    public void onmessage1(consumerrecord<?, ?> consumerrecord) {        optional<?> optional = optional.ofnullable(consumerrecord.value());        if (optional.isprent()) {            object msg = optional.get();            logger.info("partition=1,message:[{}]", msg);        }    }}

通过swagger访问tkey(也就是只给了key的方法):

可以看到key相同的被hash到了同一个分区

再访问tpartition来设置分区号0来发送:

可以看到无论key是什么,都是分区0来消费

2)自定义分区
参考代码:mypartitioner.java , mypartitiontemplate.java。
发送使用:mypartitionproducer.java。

public class mypartitioner implements partitioner {    @override    public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {//        定义自己的分区策略//                如果key以0开头,发到0号分区//                其他都扔到1号分区        string keystr = key+"";        if (keystr.startswith("0")){            return 0;        }el {            return 1;        }    }    public void clo() {    public void configure(map<string, ?> map) {}
@configurationpublic class mypartitiontemplate {     private final logger logger = loggerfactory.getlogger(this.getclass());    @value("${spring.kafka.bootstrap-rvers}")    private string bootstraprvers;    kafkatemplate kafkatemplate;    @postconstruct    public void tkafkatemplate() {        map<string, object> props = new hashmap<>();        props.put(producerconfig.bootstrap_rvers_config, bootstraprvers);        props.put(producerconfig.key_rializer_class_config, stringrializer.class);        props.put(producerconfig.value_rializer_class_config, stringrializer.class);        //注意分区器在这里!!!        props.put(producerconfig.partitioner_class_config, mypartitioner.class);        this.kafkatemplate = new kafkatemplate<string, string>(new defaultkafkaproducerfactory<>(props));    }    public kafkatemplate getkafkatemplate(){        return kafkatemplate;}
//测试自定义分区发送@restcontrollerpublic class mypartitionproducer {    @autowired    mypartitiontemplate template;//    使用0开头和其他任意字母开头的key发送消息//    看控制台的输出,在哪个分区里?    @getmapping("/kafka/mypartitionnd/{key}")    public void tpartition(@pathvariable("key") string key) {        template.getkafkatemplate().nd("test", key,"key="+key+",msg=自定义分区策略");    }}

使用swagger,发送0开头和非0开头两种key

3. 消息消费

3.1 消息组别

发送者使用:kafkaproducer.java

@restcontrollerpublic class kafkaproducer {    @resource    private kafkatemplate<string, object> kafkatemplate;    @getmapping("/kafka/test/{msg}")    public void ndmessage(@pathvariable("msg") string msg) {        message message = new message();        message.tmessage(msg);        kafkatemplate.nd("test", json.tojsonstring(message));    }}

1)代码参考:groupconsumer.java,listener拷贝3份,分别赋予两组group,验证分组消费:

//测试组消费@componentpublic class groupconsumer {    private final logger logger = loggerfactory.getlogger(groupconsumer.class);    //组1,消费者1    @kafkalistener(topics = {"test"},groupid = "group1")    public void onmessage1(consumerrecord<?, ?顾漫小说> consumerrecord) {        optional<?> optional = optional.ofnullable(consumerrecord.value());        if (optional.isprent()) {            object msg = optional.get();            logger.info("group:group1-1 , message:{}", msg);        }    }    //组1,消费者2    public void onmessage2(consumerrecord<?, ?> consumerrecord) {            logger.info("group:group1-2 , message:{}", msg);    //组2,只有一个消费者    @kafkalistener(topics = {"test"},groupid = "group2")    public void onmessage3(consumerrecord<?, ?> consumerrecord) {            logger.info("group:group2 , message:{}", msg);}

2)启动

3)通过swagger发送2条消息

同一group下的两个消费者,在group1均分消息group2下只有一个消费者,得到全部消息

4)消费端闲置
注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置(因为一个分区只能分配给一个消费者),浪费资源!

验证方式:
停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。
重新发送两条消息,试一试

group2可以消费到1、2两条消息group1下有两个消费者,但是只分配给了 1 , 2这个进程被闲置

3.2 位移提交

1)自动提交
前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

enable-auto-commit: true # 是否自动提交offtauto-commit-interval: 100 # 提交offt延时(接收到消息后多久提交offt,默认单位为ms)

2)手动提交
有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。
下面我们自己定义配置,覆盖上面的参数
代码参考:myofftconfig.java

@configurationpublic class myofftconfig {    private final logger logger = loggerfactory.getlogger(this.getclass());    @value("${spring.kafka.bootstrap-rvers}")    private string bootstraprvers;    @bean    public kafkalistenercontainerfactory<?> manualkafkalistenercontainerfactory() {        map<string, object> configprops = new hashmap<>();        configprops.put(consumerconfig.bootstrap_rvers_config, bootstraprvers);        configprops.put(consumerconfig.key_derializer_class_config, stringderializer.class);        configprops.put(consumerconfig.value_derializer_class_config, stringderializer.class);        // 注意这里!!!设置手动提交        configprops.put(consumerconfig.enable_auto_commit_config, "fal");        concurrentkafkalistenercontainerfactory<string, string> factory =                new concurrentkafkalistenercontainerfactory<>();        factory.tconsumerfactory(new defaultkafkaconsumerfactory<>(configprops));        // ack模式:        //          ackmode针对enable_auto_commit_config=fal时生腐生效,有以下几种:        //        //          record        //          每处理一条commit一次        //          batch(默认)        //          每次poll的时候批量提交一次,频率取决于每次poll的调用频率        //          time        //          每次间隔acktime的时间去commit(跟auto commit interval有什么区别呢?)        //          count        //          累积达到ackcount次的ack去commit        //          count_time        //          acktime或ackcount哪个条件先满足,就commit        //          manual        //          listener负责ack,但是背后也是批量上去        //          manual_immediate        //          listner负责ack,每调用一次,就立即commit        factory.getcontainerproperties().tackmode(containerproperties.ackmode.manual_immediate);        return factory;    }}

然后通过在消费端的consumer来提交偏移量
myofftconsumer:

@componentpublic class myofftconsumer {    private final logger logger = loggerfactory.getlogger(this.getclass());    @kafkalistener(topics = "test", groupid = "myofft-group-1", containerfactory = "manualkafkalistenercontainerfactory")    public void manualcommit(@payload string message,                             @header(kafkaheaders.received_partition_id) int partition,                             @header(kafkaheaders.received_topic) string topic,                             consumer consumer,                             acknowledgment ack) {        logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);        // 同步提交        consumer.commitsync();        //异步提交        //consumer.commitasync();        // ack提交也可以,会按设置的ack策略走(参考myofftconfig.java里的ack模式)        // ack.acknowledge();    }    @kafkalistener(topics = "test", groupid = "myofft-group-2", containerfactory = "manualkafkalistenercontainerfactory")    public void nocommit(@payload string message,                         @header(kafkaheaders.received_partition_id) int partition,                         @header(kafkaheaders.received_topic) string topic,                         consumer consumer,                         acknowledgment ack) {        logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);        // 不做commit!    /**     * 现实状况:     * commitsync和commitasync组合使用     * <p>     * 手工提交异步 consumer.commitasync();     * 手工同步提交 consumer.commitsync()     * commitsync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,     * commitsync()会一直重试,但是commitasync()不会。     * 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题     * 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。     * 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费     * 因此,在消费者关闭前一般会组合使用commitasync()和commitsync()。     *///   @kafkalistener(topics = "test", groupid = "myofft-group-3",containerfactory = "manualkafkalistenercontainerfactory")    public void manualofft(@payload string message,        try {            logger.info("同步异步搭配 , partition={}, msg={}", partition, message);            //先异步提交            consumer.commitasync();            //继续做别的事        } catch (exception e) {            system.out.println("commit failed");        } finally {            try {                consumer.commitsync();            } finally {                consumer.clo();            }        }     * 甚至可以手动提交,指定任意位置的偏移量     * 不推荐日常使用!!!//    @kafkalistener(topics = "test", groupid = "myofft-group-4",containerfactory = "manualkafkalistenercontainerfactory")    public void offt(consumerrecord record, consumer consumer) {        logger.info("手动指定任意偏移量, partition={}, msg={}", record.partition(), record);        map<topicpartition, offtandmetadata> currentofft = new hashmap<>();        currentofft.put(new topicpartition(record.topic(), record.partition()),                new offtandmetadata(爱的奉献作文record.offt() + 1));        consumer.commitsync(currentofft);}

3)重复消费问题
如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他consumer的component注解,只保留当前myofftconsumer.java 启动项目,使用swagger的kafkaproducer发送连续几条消息 留心控制台,都能消费,没问题:

但是!重启项目:

无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量

4)经验与总结
commitsync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitsync()会一直重试,但是commitasync()不会。

这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

因此,在消费者关闭前一般会组合使用commitasync()和commitsync()。
详细代码参考:myofftconsumer.manualofft()

到此这篇关于springboot整合kafka的文章就介绍到这了,更多相关springboot整合kafka内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-05 00:01:45,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/b9be613fec162da90cf564a14593041b.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:Springboot整合kafka的示例代码.doc

本文 PDF 下载地址:Springboot整合kafka的示例代码.pdf

标签:分区   消息   消费者   偏移量
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图