kafka源码解析(5)消息发送

更新时间:2023-06-30 10:23:35 阅读: 评论:0

kafka源码解析(5)消息发送
Kafka的nder线程核⼼发送⽅法就是ndProducerData
ndProducerData读源数据缓存
发送前需要获取消息的⽬的地broker节点和RecordAccumulator中要发送的消息,分为两步
step1 先拉取本地缓存的元信息本初子午线
ady查看哪个队列可以发送
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to nd
RecordAccumulator.ReadyCheckResult result =ady(cluster, now);
ndProducerData调⽤ady⽅法得到信息
readyNodes储存可以发送的节点
chmm
unknownLeaderTopics 存储未知leader的topic
exhausted 标记了线程池的内存不⾜等待队列是否有东西,⾥⾯是condition,内存池那篇讲过。内存都不够⽤了,赶紧标记起来后⾯优先安排。
遍历batches中的队列dq,从元数据找leaderrangefrom
Node leader = cluster.leaderFor(part);
要查看dq⾥⾯的东西做判断了,锁上
public ReadyCheckResult ready(Cluster cluster,long nowMs){
Set<Node> readyNodes =new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics =new HashSet<>();
boolean exhausted =this.free.queued()>0;
for(Map.Entry<TopicPartition, Deque<ProducerBatch>> entry :Set()){
TopicPartition part = Key();
Deque<ProducerBatch> deque = Value();
Node leader = cluster.leaderFor(part);
synchronized(deque){
//做判断,下⾯展开
}
}
判断内容,leader是空,那得拉取服务端元数据看看了,unknownLeaderTopics加进去相应topics
dq拿出第⼀个batch后:
backingOff是重试相关,下⾯是true代表还在重试时间中,换句话说,还不能发送
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
samurai
expired判断没重试dq等待时间超过lingerMs(设置的dq接受信息后等多久,不为0的话多等⼀会,可能有更多内容⼀起⾛)
expired判断重试的dq等待是否超过预设
full判断dq的第⼀个batch是否满了,或者是dq中有新的batch
如何判断是否可发送呢,full,expired,exhausted,⽣产者 关了,flushInProgress(这块没明⽩)
可以发送的leader节点加⼊readyNodes
like to do 和like doing的区别
el分⽀中,batch是空,那就取所有batch为空dq的最⼩timeLeftMs,作为下⼀次查看是否准备好的时间
if(leader ==null&&!deque.isEmpty()){
少儿英语教学游戏unknownLeaderTopics.pic());
}el if(!ains(leader)&&!isMuted(part, nowMs)){
ProducerBatch batch = deque.peekFirst();
if(batch !=null){
long waitedTimeMs = batch.waitedTimeMs(nowMs);
boolean backingOff = batch.attempts()>0&& waitedTimeMs < retryBackoffMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
boolean full = deque.size()>1|| batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean ndable = full || expired || exhausted || clod ||flushInProgress();
if(ndable &&!backingOff){
readyNodes.add(leader);
}el{
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs,0);
relational
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
komu>untilyou
}
ndProducerData处理⽆信息的leader
还记得之前的⽂章吗,metadata add后会把请求更新的标志设为true,questUpdate(),这个⽅法更改了标志位但是并没更新
if(!result.unknownLeaderTopics.isEmpty()){
// The t of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to nd to the topic.
for(String topic : result.unknownLeaderTopics)
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
}
ndProducerData检查leader⽹络情况
Iterator<Node> iter = adyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while(iter.hasNext()){
Node node = ();
if(!ady(node, now)){
notReadyTimeout = Math.min(notReadyTimeout,this.client.pollDelayMs(node, now));
}
}
clementinendProducerData制作⽹络请求
⾸先让recordAccumulator 调⽤drain⼀下,把发往相同leader的batch放⼀起
Map<Integer, List<ProducerBatch>> batches =this.accumulator.drain(cluster, adyNodes,this.maxRequestSize, now); ndProducerData 使⽤addToInflightBatches⽅法将batches放⼊在途批次
addToInflightBatches(Map<Integer, List> batches) ⽅法,batches 是map,k是leader,因此每个leader都有个⾃⼰的inflightBatchList
private void addToInflightBatches(List<ProducerBatch> batches){
for(ProducerBatch batch : batches){
List<ProducerBatch> inflightBatchList = (picPartition);
if(inflightBatchList ==null){
inflightBatchList =new ArrayList<>();
inFlightBatches.picPartition, inflightBatchList);
}
inflightBatchList.add(batch);
}
}
最后是client.nd(clientRequest, now)发送出去

本文发布于:2023-06-30 10:23:35,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/162503.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:发送   是否   判断   信息
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图