FlinkConsumer分区和subtask对应关系以及FlinkKafkaConsum。。。FlinkConsumer 分区和subtask对应关系以及FlinkKafkaConsumerBa解析
FlinkKafkaConsumerBa类
FlinkKafkaConsumerBa 是⼀个核⼼类,其中的,FlinkKafkaConsumer08,FlinkKafkaConsumer09,FlinkKafkaConsumer10等都继承了这个类,⾸先我们看下这个类的构造⽅法:
看⼀下discoveryIntervalMillis ,这个是partition的⾃动发现时间,默认是public static final long
PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;,也就是永远不⾃动发现,这样如果对应的kafka Topic增加分区,那么需要重启程序,才能被发现,
public FlinkKafkaConsumerBa(
List<String> topics,
Pattern topicPattern,goalkeeper>weig
KafkaDerializationSchema<T> derializer,
long discoveryIntervalMillis,
boolean uMetrics){
this.derializer =checkNotNull(derializer,"valueDerializer");
checkArgument(
discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >=0,
"Cannot define a negative value for the topic / partition discovery interval.");
this.discoveryIntervalMillis = discoveryIntervalMillis;
this.uMetrics = uMetrics;
}
紧接着,看⼀下FlinkKafkaConsumerBa 的open⽅法,这⾥⾯是对所有partition 的初始化,以及sub
tazsk和partition⼀⼀对应重要代码,也是FlinkConsumer 是如何保证⼀个 partition 对应⼀个 thread 的关键所在
public void open(Configuration configuration)throws Exception {
// determine the offt commit mode
this.offtCommitMode = OfftCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
// create the partition discoverer
this.partitionDiscoverer =createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffts =new HashMap<>();
enableCommitOnCheckpoints 默认开启checkpoints的时候,会默认使⽤offect提交模式 On_CHECKPOINTS,因为⽬前flink提交kafka的⽅式有三种,
1、开启 checkpoint : 在 checkpoint 完成后提交
2、开启 checkpoint,禁⽤ checkpoint 提交: 不提交消费组 offt
3、不开启 checkpoint: 依赖kafka client 的⾃动提交
后续单独开⼀篇⽂章,重点介绍
紧接着,我们会看到⼀个discoverPartitions⽅法,这是PartitionDiscoverer类⾥⾯的,这个是⼀个重点⽅法,也是为什么Flink能⼀个分区对应⼀个
final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
if(restoredState != null){
for(KafkaTopicPartition partition : allPartitions){
if(!ainsKey(partition)){
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for(Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : Set()){
// ed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if(KafkaTopicPartitionAssigner.assign(
//getNumberOfParallelSubtasks 所有并⾏度个数,
/
/getIndexOfThisSubtask 并⾏度ID
jugglers
==getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffts.Key(), Value());
}
}
if(filterRestoredPartitionsWithCurrentTopicsDescriptor){
if(!topicsDescriptor.Key().getTopic())){
LOG.warn(
"{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",canceling
return true;
}
return fal;
});
}
LOG.info("Consumer subtask {} will start reading {} partitions with offts in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffts.size(), subscribedPartitionsToStartOffts);我们进去看下discoverPartitions()⽅法
public List<KafkaTopicPartition>discoverPartitions()throws WakeupException, ClodException {
if(!clod &&!wakeup){
try{
List<KafkaTopicPartition> newDiscoveredPartitions;
//这⾥只是做了判断,判断传⼊的Topic是否是⼀个topic名称,还是正则匹配,平时只会传⼊⼀个具体的topic名称relationship是什么意思
// (1) get all possible partitions, bad on whether we are subscribed to fixed topics or a topic pattern
if(topicsDescriptor.isFixedTopics()){
//获取所有的Kafkapartition
newDiscoveredPartitions =FixedTopics());
}el{
List<String> matchedTopics =getAllTopics();
// retain topics that match the pattern
Iterator<String> iter = matchedTopics.iterator();
while(iter.hasNext()){
if(!topicsDescriptor.())){
}
}
if(matchedTopics.size()!=0){
// get partitions only for matched topics
newDiscoveredPartitions =getAllPartitionsForTopics(matchedTopics);
}el{
newDiscoveredPartitions = null;
}
}
/
/newDiscoveredPartitions 获取全部的Kafka分区,但是⽬前还不是和subtask⼀⼀对应的关系,
// 如果为Null,或者为o,那么这个topic是没有分区的,也就会报错"Unable to retrieve any partitions
// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
if(newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()){
//如果kafka分区为空,那么初始化的时候就会报错
throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: "+ topicsDescriptor);
}el{
//这⾥要注意,下⾯的代码主要逻辑视为了让subtask和topic对应起来,
//
// 具体我们点进去看下tAndCheckDiscoveredPartition
Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
KafkaTopicPartition nextPartition;
while(iter.hasNext()){
nextPartition = ();
if(!tAndCheckDiscoveredPartition(nextPartition)){
}
}
}
return newDiscoveredPartitions;
}catch(WakeupException e){
// the actual topic / partition metadata fetching methods
/
/ may be woken up midway; ret the wakeup flag and rethrow
wakeup =fal;
throw e;
}
}el if(!clod && wakeup){
// may have been woken up before the method call
wakeup =fal;
throw new WakeupException();
}el{
throw new ClodException();
}
日语在线翻译}
这⾥我都进⾏了注释,可以仔细阅读⼀下,这个⽅法最终会返回⼀个只数据这⼀个subtask的分区List,其中最为核⼼的算法就封装在tAndCheckDiscoveredPartition(),我们点击去看下
public boolean tAndCheckDiscoveredPartition(KafkaTopicPartition partition){
//如果是新分区,会增加到这个t中,
if(isUndiscoveredPartition(partition)){
discoveredPartitions.add(partition);
//kafkaPartition与indexOfThisSubTask --对应
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks)== indexOfThisSubtask;
}
return fal;
}
这⾥就牵涉到具体的计算逻辑了,为什么Flink能保证⼀个partition对应⼀个Thread
具体原理:
int startIndex = ((Topic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
(startIndex + Partition()) % numParallelSubtasks
numParallelSubtasks:subtask的并⾏数,也就是flink设置的 并⾏度
hong kong government
如L partition 个数为 6;并⾏度为 3
那么会刚好平均分配到⼀个subtask中
但是要主要,如果并⾏度设置过⼤,⼤于了分区数,那么就会产⽣,有的线程是空的,导致资源浪费,
public class KafkaTopicPartitionAssigner {
/**
* Returns the index of the target subtask that a specific Kafka partition should be
* assigned to.
*
* <p>The resulting distribution of partitions of a single topic has the following contract:
* <ul>
* <li>1. Uniformly distributed across subtasks</li>
* <li>2. Partitions are round-robin distributed (strictly ascending
* subtask indices) by using the partition id as the offt from a starting index
* (i.e., the index of the subtask which partition 0 of the topic will be assigned to,
* determined using the topic name).</li>
* </ul>
*
* <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this
* contract to locally filter out partitions that it should not subscribe to, guaranteeing
* that all partitions of a single topic will always be assigned to some subtask in aoceanpark
* uniformly distributed manner.
*
* @param partition the Kafka partition
* @param numParallelSubtasks total number of parallel subtasks
*
* @return index of the target subtask that the Kafka partition should be assigned to.
*/
public static int assign(KafkaTopicPartition partition,int numParallelSubtasks){
int startIndex =((Topic().hashCode()*31)&0x7FFFFFFF)% numParallelSubtasks;
// here, the assumption is that the id of Kafka partitions are always ascending
// starting from 0, and therefore can be ud directly as the offt clockwi from the start index
return(startIndex + Partition())% numParallelSubtasks;
}
}
通过以上操作:最终返回的allPartitions 是属于这个并⾏线程的全部partition,
接下来就分两部分,⼀部分是不从checkpoint中恢复,⼀种是从checkpoint中恢复
Flink如何⽣成消费Kakfa分区的任务
第⼀步已经⽣成好⼀个List allPartitions ,它⾥⾯包含了这个subtask对应的分区信息
这时候,返回到最初的位置我们可以看到有if (restoredState != null)判断,restoredState是flink从中间状态恢复的信息,我们先讨论没有ckeckpoint的情况,
// u the partition discoverer to fetch the initial ed partitions,
// and t their initial offts depending on the startup mode.
// for SPECIFIC_OFFSETS and TIMESTAMP modes, we t the specific offts now;
小学英语ppt// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offt is lazily determined
// when the partition is actually read.
switch(startupMode){
ca SPECIFIC_OFFSETS:
if(specificStartupOffts == null){
throw new IllegalStateException(
"Startup mode for the consumer t to "+ StartupMode.SPECIFIC_OFFSETS +
", but no specific offts were specified.");
}
for(KafkaTopicPartition edPartition : allPartitions){
Long specificOfft = (edPartition);
if(specificOfft != null){
// since the specified offts reprent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffts.put(edPartition, specificOfft -1);
}el{
// default to group offt behaviour if the ur-provided specific offts
// do not contain a value for this partition
ctera edgesubscribedPartitionsToStartOffts.put(edPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
ca TIMESTAMP:
if(startupOfftsTimestamp == null){
throw new IllegalStateException(
"Startup mode for the consumer t to "+ StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}
for(Map.Entry<KafkaTopicPartition, Long> partitionToOfft
:fetchOfftsWithTimestamp(allPartitions, startupOfftsTimestamp).entrySet()){
subscribedPartitionsToStartOffts.put(
(Value()== null)
// if an offt cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offt for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offts reprent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: Value()-1);
}
break;
default:
for(KafkaTopicPartition edPartition : allPartitions){
subscribedPartitionsToStartOffts.put(edPartition, StateSentinel());
}
接下来是有CheckPoint的情况,基本类似,中间会有⼀些状态的校验⼯作: