ek

更新时间:2022-12-27 19:59:58 阅读: 评论:0


2022年12月27日发(作者:广东2b院校)

kafka消费者之ek⽅法

继续kafka学习之旅。今天学习的还是<深⼊理解kafka-核⼼设计与实践原理>⼀书。上⼀篇博客,学习了消费者,今天继续学习消费者。

。这个参数的意思是:当kafka消费者在_consumer_offt主题中找不到所属分区的offt时,该参数就派上⽤场了,

改参数有三个可选值,latest、earilst、none。第⼀个取值是说,当消费者找不到偏移量时,就从⽇志尾部开始消费,earilst是从⽇志头部

开始消费。none是既不从头也不从尾,⽽是抛出NoOfftForPartitionException异常。

除了消费者找不到位移,会使⽤该参数外。如果消费者能找到偏移量,但是对应偏移量上没有消息,也会使⽤该参数。

但是,⼤家也能看出来,这个参数的⼒度太⼤了。不是从头,就是从尾。有没有⼀种⽅法能我们⾃⼰选择消费的位置呢?

有。

kafka提供了ek⽅法,可以让我们从分区的固定位置开始消费。

⼊参为ek(TopicPartitiontopicPartition,offtofft)。前⾯我们讲过TopicPartition这个对象⾥有2个成员变量。⼀个是Topic,⼀

个是partition。再结合offt,完全就可以定位到某个主题、某个分区的某个leader副本的active⽇志⽂件的某个位置。

offt是指分区的消息偏移量

下⾯看⼀下ek⽅法的使⽤例⼦:

rotectedstaticPropertiesinitConfig(){

Propertiesproperties=newProperties();

(RAP_SERVERS_CONFIG,brokerList);

(_DESERIALIZER_CLASS_CONFIG,e());

(_DESERIALIZER_CLASS_CONFIG,e());

(_AUTO_COMMIT_CONFIG,"fal");

(_ID_CONFIG,groupId);

(_ID_CONFIG,clientId);

returnproperties;

}

publicstaticvoidmain(String[]args){

KafkaConsumerkafkaConsumer=newKafkaConsumer(initConfig());

ibe((topic));

//kafka的分区逻辑是在poll⽅法⾥执⾏的,所以执⾏ek⽅法之前先执⾏⼀次poll⽅法

//获取当前消费者消费分区的情况

Setassignment=newHashSet<>();

while(()==0){

//如果没有分配到分区,就⼀直循环下去

(100L);

assignment=ment();

}

for(TopicPartitiontp:assignment){

//消费第当前分区的offt为10的消息

(tp,10);

}

while(()){

ConsumerRecordsconsumerRecords=(2000L);

n("本次拉取的消息数量:"+());

n("消息集合是否为空:"+y());

for(ConsumerRecordconsumerRecord:consumerRecords){

n("消费到的消息key:"+()+",value:"+()+",offt:"+());

}

}

}

上⾯的情形是我们知道具体的消费位置,如果我们不知道具体的消费位置呢?⽇常开发过程中,我们可能有从某⼀个时间段开始消费的场

景。⽐如:从昨天的某个时间点开始消费

kafka提供了⼀个offtForTimes⽅法获取某⼀个时间的消息的偏移量和时间戳,我们获取到偏移量,就可以使⽤ek⽅法从某个时间段

开始消费了,⽰例如下:

publicstaticvoidmain(String[]args){

KafkaConsumerkafkaConsumer=newKafkaConsumer<>(initConfig());

ibe((topic));

Setassignment=newHashSet<>();

while(()==0){

(100L);

assignment=ment();

}

Mapmap=newHashMap<>();

for(TopicPartitiontp:assignment){

(tp,tTimeMillis()-1*24*3600*1000);

}

Mapoffts=sForTimes(map);

for(TopicPartitiontopicPartition:()){

OfftAndTimestampofftAndTimestamp=(topicPartition);

if(offtAndTimestamp!=null){

(topicPartition,());

}

}

while(()){

ConsumerRecordsconsumerRecords=(1000L);

n("本次拉取的消息数量:"+());

n("消息集合是否为空:"+y());

for(ConsumerRecordconsumerRecord:consumerRecords){

n("消费到的消息key:"+()+",value:"+()+",offt:"+());

}

}

}

既然ek⽅法只认partition和offt,那么我们完全可以将partiton和下⼀次要消费的offt存⼊数据库,操作如下:

publicstaticvoidmain(String[]args){

KafkaConsumerkafkaConsumer=newKafkaConsumer<>(initConfig());

ibe((topic));

Setassignment=newHashSet<>();

while(()==0){

(100L);

assignment=ment();

}

for(TopicPartitiontp:assignment){

Longofft=getOfftFromDB(tp);

(tp,offt);

}

while(()){

ConsumerRecordsconsumerRecords=(1000L);

Setpartitions=ions();

for(TopicPartitiontp:partitions){

List>records=s(tp);

for(ConsumerRecordrecord:records){

//processrecord

}

longlastConsumedOfft=(()-1).offt();

//保存位移

storeOfftToDB(tp,lastConsumedOfft+1);

}

}

}

privatestaticvoidstoreOfftToDB(TopicPartitiontp,Longofft){

}

privatestaticLonggetOfftFromDB(TopicPartitiontp){

returnnull;

}

本文发布于:2022-12-27 19:59:58,感谢您对本站的认可!

本文链接:http://www.wtabcd.cn/fanwen/fan/90/42326.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:break down
下一篇:neitherof
标签:seek
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图