resttemplate默认超时时间_深⼊理解Kafka客户端之超时批次
的处理
⼀、场景分析
豌豆苗的做法前⾯提到,消息封装成批次对象ProducerBatch后,会放到RecordAccumulator对应的Deque队列,等待Sender线程去发送。但是
封装好的批次会有⼀个时间限制,如果超过这个时间限制还未发送成功,那么就会将该批次标记为超时批次,从⽽执⾏相应的处理。那么客
户端如何处理这种超时的批次呢?为什么超时批次会导致数据重复?这篇进⾏详细的分析。 ⼆、图⽰说明
⾸先更正⼀下《深⼊理解Kafka客户端之服务端响应及超时请求的处理》中的流程图,图中少画了⼀个数据结构:InFlightBatches,这
个结构保存的是正在发送的批次。当Sender线程将缓存中的批次拿出来封装成请求的同时,会将这个批次放到InFlightBatches结构中,即
标记这些批次正在发送,当发送成功,会将这个批次从InFlightBatches中移除,同样,如果批次超时,也会将该批次移除。
超时批次的处理主要在右下⾓Sender线程这块:
三、过程源码分析
交通安全提示语还是从Sender的runOnce⽅法看起,主要看下⾯的代码:
long pollTimeout = ndProducerData(currentTimeMs);
private long ndProducerData(long now) { ... //TODO 步骤六:对超时批次对处理(由于没有建⽴⽹络连接,第⼀次这⾥的代码也不执⾏)这⾥的超时指⽣成的批次
1. 在ndProducerData⽅法中,⾸先获取inFlightBatches中超时的批次:
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);无法忍受秋瓷炫
private ListgetExpiredInflightBatches(long now) { List expiredBatches = new ArrayList<>(); for (Iterator>> batchIt = Set().iterator(); batchIt 判断批次是否超时:
batch.DeliveryTimeoutMs(), now))
boolean hasReachedDeliveryTimeout(long deliveryTimeoutMs, long now) { return deliveryTimeoutMs <= now - atedMs;}
这⾥判断批次是否超时调⽤的是ProducerBatch.hasReachedDeliveryTimeout⽅法。如果当前时间-批次的创建时间
>deliveryTimeoutMs,则超时。deliveryTimeoutMs的默认值为120秒。
如果批次超时,则从inFlightBatches数据结构中将该批次移除,同时将该批次放⼊expiredBatches集合。
如果批次未超时,则更新批次超时的具体时间nextBatchExpiryTimeMs
if (batch.DeliveryTimeoutMs(), now)) { //如果该批次已经超时了,就从inFlightBatches集合中移除 ve();
这⾥注意batch.isDone()⽅法,该⽅法判断批次是否执⾏了done()⽅法,即是否已经标记了批次的状态(FAILED、ABORTED或者SUCCEEDED),关于done()⽅法,后⾯进⾏具体的分析。
2. 获取缓存中超时的批次,这⾥指未进⾏发送,还在RecordAccumulator中每个分区对应的Deque队列中的批次
List expiredBatches = piredBatches(now);
public List expiredBatches(long now) { List expiredBatches = new ArrayList<>(); for (Map.Entry> entry : Set()) { //获取每个分区对应的批次
这⾥的逻辑⽐较简单,就是判断所有Deque队列中的批次是否超时,判断标准和上⾯⼀样,即批次创建的时间超过了
deliveryTimeoutMs(默认120秒)。如果超时,则将这个批次从Deque中移除,同时终⽌该批次的写⼊(有时候该批次还未写满),然后将该
批次放⼊过期批次的集合。
3. 将inFlightBatches和缓存中过期的批次合并到同⼀个集合中:
expiredBatches.addAll(expiredInflightBatches);
4. 遍历过期的批次进⾏处理:
for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage = "Expiring " + dCount + " record(s) for " + picPartit 这⾥主要调⽤了failBatch⽅法进⾏,注意此时RuntimeException参数传⼊的是⼀个new TimeoutException(errorMessage):
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), fal);
failBatch⽅法具体代码如下:
private void failBatch(ProducerBatch batch, long baOfft, long logAppendTime, RuntimeException exception,
这⾥我们详细分析⼀下batch.done⽅法,⾸先看⼀下⽅法的注释:
/** * Finalize the state of a batch. Final state, once t, is immutable. This function may be called * once or twice on a batch. It may be called twice if * 1. An
翻译如下:
该⽅法⽤来标记批次的最终执⾏状态,这个状态⼀旦设置,就⽆法更改。这个⽅法可能被每个批次调⽤1次或者两次,调⽤2次的情况有:1。⼀个inFlight批次在服务端
注意:批次的最终状态⼀旦被设置,则⽆法更改 这个调⽤2次什么意思呢?
假设有批次正在发送,那么该批次保存在InFlightBatches结构中,如果此时该批次被判断为已超时,那么就会将该批次标记为
FAILED。但是,如果后⾯发现这个批次发送成功了(因为该批次已经封装了发送的请求,正在发送),服务端返回了正常的响应,那么
该⽅法还会被调⽤1次,但是批次的状态不会再修改。
假设发送了事务回滚或者⽣产者关闭,状态会先被标记为ABORTED,如果后续服务端针对该批次返回了成功的响应,那么还会调⽤该⽅法1次。同样,批次的状态不会再修改。
public boolean done(long baOfft, long logAppendTime, RuntimeException exception) { //如果有异常,状态暂时设为FAILED,否则设置为SUCCEEDED fina
草香附该⽅法的逻辑是:
当传⼊的exception对象为null时,tryFinalState 变量会被赋值为SUCCEEDED,否则赋值为FAILED。
然后判断this.finalState属性的值:
如果为null,说明是第⼀次设置批次的状态,那么就将this.finalState属性的值设置为tryFinalState 变量的值,然后遍历批次中的
消息,执⾏回调函数,并最终返回true。
如果不为null,说明之前已经设置过⼀次批次状态。知否知否主题曲
此时如果第⼀次设置状态不为SUCCEEDED,且本次准备将批次状态设置为SUCCEEDED,那么该⽅法最终返回fal。
如果第⼀次设置为SUCCEEDED,则抛出异常:SECCEEDED状态的批次不允许修改状态
这⾥设置了属性值后,通过completeFutureAndFireCallbacks⽅法会遍历批次中的消息,执⾏回调函数的逻辑,这⾥的回调函数就是⽣
产者⽣产消息时绑定的:幺宁
private void completeFutureAndFireCallbacks(long baOfft, long logAppendTime, RuntimeException exception) { produceFuture.t(baOfft, logAppend 整个超时批次的处理过程可以简化为下图的流程:
总结 超时批次的处理流程如下:
获取InFlightBatches中超时的批次
获取缓存队列Deque中超时的批次
合并超时批次
遍历批次进⾏处理迎新标语
标记批次状态为FAILED,遍历批次中的消息执⾏⽣产时绑定的回调函数
再次判断该批次是否在InFlightBatches中,如果在,则移除微信网页版
释放批次占⽤的内存
此时,难免会存在疑问:如果存在正在发送的批次超时了,被标记为FAILED,执⾏回调函数;但后续该批次发送成功,服务端正常写⼊
了⽇志。⽽恰好我们⽣产消息时,在绑定回调函数时对超时的消息进⾏重新发送,那么就会发⽣数据重复的现象。 因为Kafka⽣产者提
供的消息传输保障为at least once,所以确实会存在数据重复的现象。从0.11.0.0版本开始,Kafka引⼊了 幂等 和 事务 两个特性,以此
来实现exactly once mantics(精确⼀次处理语义)。对于这两个特性,后⾯再进⾏分析。