kafkaproducer的batch.size和linger.ms

更新时间:2023-06-14 03:53:18 阅读: 评论:0

kafkaproducer的batch.size和linger.ms
1.问题
b atch.size和linger.ms是对kafka producer性能影响⽐较⼤的两个参数。batch.size是p roducer批量发送的基本单位,默认是16384Bytes,即16kB;lingger.ms是nder线程在检查batch是否ready时候,判断有没有过期的参数,默认⼤⼩是0ms。
那么producer是按照batch.size⼤⼩批量发送消息呢,还是按照linger.ms的时间间隔批量发送消息呢?这⾥先说结论:其实满⾜batch.size和ling.ms之⼀,producer便开始发送消息。
2.源码分析
⾸先nder线程主要代码如下,我们主要关⼼nder线程阻塞的情况:
void run(long now) {
Cluster cluster = metadata.fetch();
// ReadyCheckDelayMs表⽰下次检查是否ready的时间,也是//lecotr会阻塞的时间
RecordAccumulator.ReadyCheckResult result = ady(cluster, now);
if (result.unknownLeadersExist)
Iterator<Node> iter = adyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = ();
if (!ady(node, now)) {
notReadyTimeout = Math.min(notReadyTimeout, tionDelay(node, now));
}
}
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.picPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.questTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
pic(), dCount);
nsors.updateProduceRequestMetrics(batches);上升星座射手座
儿童咳嗽有痰List<ClientRequest> requests = createProduceRequests(batches, now);
// 暂且只关⼼ReadyCheckDelayMs毛利率的算法
long pollTimeout = Math.ReadyCheckDelayMs, notReadyTimeout);
if (adyNodes.size() > 0) {
维护网络安全
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.nd(request, now);
// poll最终会调⽤s elector,pollTimeout也就是lector阻塞的时间
this.client.poll(pollTimeout, now);
}
lector
private int lect(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.lectNow();
el
return this.nioSelector.lect(ms);
}
我们可以从实例化⼀个新的KafkaProducer开始分析(还没有调⽤nd⽅法),在nder线程调⽤accumulator#ready(..)时候,会返红葡萄酒保质期多久
回r esult,其中包含lector可能要阻塞的时间。由于还没有调⽤nd⽅法,所以Deque<RecordBatch>为空,所以result中包含的nextReadyCheckDelayMs也是最⼤值,这个时候lector会⼀直阻塞。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<Node>();
// 初始化为最⼤值
long nextReadyCheckDelayMs = Long.MAX_VALUE;
boolean unknownLeadersExist = fal;
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : Set()) {
TopicPartition part = Key();
Deque<RecordBatch> deque = Value();
Node leader = cluster.leaderFor(part);
法官工资
if (leader == null) {
unknownLeadersExist = true;
} el if (!ains(leader) && !ains(part)) {
synchronized (deque) {
RecordBatch batch = deque.peekFirst();
伞房决明if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
// 和linger.ms有关
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || ds.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
boolean ndable = full || expired || exhausted || clod || flushInProgress();
if (ndable && !backingOff) {
readyNodes.add(leader);
} el {
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
诗经经典}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist);
}
然后我们调⽤nd⽅法往内存中放⼊了⼀条数据,由于是新建的⼀个RecordBatch,所以会唤醒nder线程
KafkaProducer#doSend(...)
if (result.batchIsFull || wBatchCreated) {
this.nder.wakeup();
}
这个时候会唤醒阻塞在lector#lect(..)的nder线程,nder线程⼜运⾏到accumulator#ready(..),由于Deque<RecordBatch>有值,所以返回的result包含的nextReadyCheckDelayMs不再是最⼤值,⽽是和linger.ms有关的值。也就是时候lector会z最多阻塞lingger.ms后就返回,然后再次轮询。
也就是说当Deque<RecordBatch>不为空的时候,nder线程会最多阻塞linger.ms时间;Deque<RecordBatch>为空的时
候,nder线程会阻塞Long.MAX_VALUE时间;⼀旦调⽤了K afkaProduer#nd(..)将消息放到内存中,新建了个
RecordBatch,则会将nder线wakeup。
另外从上⾯的代码,即KafkaProducer#doSend(...)中也可以看到,如果有⼀个RecordBatch满了,也会调⽤Sender#wakeup(..),所以综上所述:只要满⾜linger.ms和b atch.size满了就会激活nder线程来发送消息。

本文发布于:2023-06-14 03:53:18,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/949685.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   阻塞   发送   消息   时间
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图