kafka消费者之ek⽅法
继续kafka学习之旅。今天学习的还是<深⼊理解kafka-核⼼设计与实践原理>⼀书。上⼀篇博客,学习了消费者,今天继续学习消费者。
。这个参数的意思是:当kafka消费者在_consumer_offt主题中找不到所属分区的offt时,该参数就派上⽤场了,
改参数有三个可选值,latest、earilst、none。第⼀个取值是说,当消费者找不到偏移量时,就从⽇志尾部开始消费,earilst是从⽇志头部
开始消费。none是既不从头也不从尾,⽽是抛出NoOfftForPartitionException异常。
除了消费者找不到位移,会使⽤该参数外。如果消费者能找到偏移量,但是对应偏移量上没有消息,也会使⽤该参数。
但是,⼤家也能看出来,这个参数的⼒度太⼤了。不是从头,就是从尾。有没有⼀种⽅法能我们⾃⼰选择消费的位置呢?
有。
kafka提供了ek⽅法,可以让我们从分区的固定位置开始消费。
⼊参为ek(TopicPartitiontopicPartition,offtofft)。前⾯我们讲过TopicPartition这个对象⾥有2个成员变量。⼀个是Topic,⼀
个是partition。再结合offt,完全就可以定位到某个主题、某个分区的某个leader副本的active⽇志⽂件的某个位置。
offt是指分区的消息偏移量
下⾯看⼀下ek⽅法的使⽤例⼦:
rotectedstaticPropertiesinitConfig(){
Propertiesproperties=newProperties();
(RAP_SERVERS_CONFIG,brokerList);
(_DESERIALIZER_CLASS_CONFIG,e());
(_DESERIALIZER_CLASS_CONFIG,e());
(_AUTO_COMMIT_CONFIG,"fal");
(_ID_CONFIG,groupId);
(_ID_CONFIG,clientId);
returnproperties;
}
publicstaticvoidmain(String[]args){
KafkaConsumer
ibe((topic));
//kafka的分区逻辑是在poll⽅法⾥执⾏的,所以执⾏ek⽅法之前先执⾏⼀次poll⽅法
//获取当前消费者消费分区的情况
Set
while(()==0){
//如果没有分配到分区,就⼀直循环下去
(100L);
assignment=ment();
}
for(TopicPartitiontp:assignment){
//消费第当前分区的offt为10的消息
(tp,10);
}
while(()){
ConsumerRecords
n("本次拉取的消息数量:"+());
n("消息集合是否为空:"+y());
for(ConsumerRecord
n("消费到的消息key:"+()+",value:"+()+",offt:"+());
}
}
}
上⾯的情形是我们知道具体的消费位置,如果我们不知道具体的消费位置呢?⽇常开发过程中,我们可能有从某⼀个时间段开始消费的场
景。⽐如:从昨天的某个时间点开始消费
kafka提供了⼀个offtForTimes⽅法获取某⼀个时间的消息的偏移量和时间戳,我们获取到偏移量,就可以使⽤ek⽅法从某个时间段
开始消费了,⽰例如下:
publicstaticvoidmain(String[]args){
KafkaConsumer
ibe((topic));
Set
while(()==0){
(100L);
assignment=ment();
}
Map
for(TopicPartitiontp:assignment){
(tp,tTimeMillis()-1*24*3600*1000);
}
Map
for(TopicPartitiontopicPartition:()){
OfftAndTimestampofftAndTimestamp=(topicPartition);
if(offtAndTimestamp!=null){
(topicPartition,());
}
}
while(()){
ConsumerRecords
n("本次拉取的消息数量:"+());
n("消息集合是否为空:"+y());
for(ConsumerRecord
n("消费到的消息key:"+()+",value:"+()+",offt:"+());
}
}
}
既然ek⽅法只认partition和offt,那么我们完全可以将partiton和下⼀次要消费的offt存⼊数据库,操作如下:
publicstaticvoidmain(String[]args){
KafkaConsumer
ibe((topic));
Set
while(()==0){
(100L);
assignment=ment();
}
for(TopicPartitiontp:assignment){
Longofft=getOfftFromDB(tp);
(tp,offt);
}
while(()){
ConsumerRecords
Set
for(TopicPartitiontp:partitions){
List
for(ConsumerRecord
//processrecord
}
longlastConsumedOfft=(()-1).offt();
//保存位移
storeOfftToDB(tp,lastConsumedOfft+1);
}
}
}
privatestaticvoidstoreOfftToDB(TopicPartitiontp,Longofft){
}
privatestaticLonggetOfftFromDB(TopicPartitiontp){
returnnull;
}
本文发布于:2022-12-27 19:59:58,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/42326.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |