深入理解kafka:核心设计与实践原理_Kafka-核心设计和实现原理,生产者和消费者详述...

更新时间:2023-05-09 20:52:43 阅读: 评论:0

深⼊理解kafka:核⼼设计与实践原理_Kafka-核⼼设计和实现
原理,⽣产者和消费者详述...
1.体系架构
Producer:⽣产者
Consumber:消费者
Broker:服务代理节点(kafka实例)
2.消息存储
主题(Topic):kafka消息以topic为单位进⾏归类,逻辑概念
分区(Partition):
Topic-Partition为⼀对多
分区在存储层⾯可看做是⼀个可追加的⽇志⽂件
消息在追加到分区时会分配⼀个特定的偏移量(offt)作为在此分区的唯⼀标⽰
kafka通过offt保证消息在分区内的顺序性,但只保证分区有序⽽不保证主题有序
每条消息发送到broker前,会根据分区规则分配到具体的哪个分区
3.容灾设计
多副本机制(Replica):
⼀个分区会在多个副本中保存相同的消息
副本之间是⼀主多从关系
leader副本负责读写操作,follower副本只负责同步消息(主动拉取)
leader副本故障时,从follower副本重新选举新leader
同步状态
分区中所有副本统称为 AR(Assigned Replicas)
所有与leader副本保持⼀定程度同步的副本(包括leader)组成 ISR(In-Sync Replicas)
同步之后过多的副本组成 OSR(Out-of-Sync Replicas)
特殊偏移量
LEO(Log End Offt):标识当前分区下⼀条代写⼊消息的offt
HW(High Watermark):⾼⽔位,标识了⼀个特定的offt,消费者只能拉渠道这个offt之前的消息(不含HW)所有副本都同步了的消息才能被消费,HW的位置取决于所有follower中同步最慢的分区的offt
⼆、⽣产者
1.客户端开发
消息发送步骤
配置⽣产者客户端参数及创建相应的⽣产者实例
Properties
KafkaProducer
构建待发送的消息:ProducerRecord
发送消息:nd( ),flush( )
关闭⽣产者实例:clo( )
必要参数配置:
bootstrap.rvers:设置kafka集群地址,并⾮需要所有broker地址,因为⽣产者会从给定的broker中获取其他broker信息key.rializer、value.rializer:转换字节数组到所需对象的序列化器,填写全限类名
发送模式
发后即忘(fire-and-forget):只管往kafka发送⽽不关⼼消息是否正确到达,不对发送结果进⾏判断处理;
同步(sync):KafkaProducer.nd()返回的是⼀个Future对象,使⽤()来阻塞获取任务发送的结果,来对发送结果进⾏相应的处理;
异步(async):向nd()返回的Future对象注册⼀个Callback回调函数,来实现异步的发送确认逻辑。
拦截器
实现ProducerInterceptor接⼝,在消息发送的不同阶段调⽤
configure():完成⽣产者配置时
onSend():调⽤nd()后,消息序列化和计算分区之前
onAcknowledgement():消息被应答之前或消息发送失败时
clo():⽣产者关闭时
通过 interceptor.class 配置指定
序列化
⾃定义序列化器:实现Serializer接⼝
分区器
在消息发送到kafka前,需要先计算出分区号,默认使⽤DefaultPartitioner(采⽤MurmurHash2算法)
⾃定义分区器:实现Partitioner接⼝
通过partitioner.class配置指定
2.原理分析
整体架构
主线程KafkaProducer创建消息,通过可能的拦截器、序列化器和分区器之后缓存到消息累加器(Reco
rdAccumulatro)
消息在RecordAccumulator被包装成ProducerBatch,以便Sender线程可以批量发送,缓存的消息发送过慢时,nd()⽅法会被阻塞或抛异常
缓存的⼤⼩通过配置,阻塞时间通过max.block.ms配置
Kafka⽣产者客户端中,通过ByteBuffer实现消息内存的创建和释放,⽽RecordAccumulator内部有⼀个BufferPool⽤来实现ByteBuffer的复⽤
Sender从RecordAccumulator中获取缓存的消息后,将ProducerBatch按Node分组,Node代表broker节点。也就是说nder只向具体broker节点发送消息,⽽不关注属于哪个分区,这⾥是应⽤逻辑层⾯到⽹络层⾯的转换
Sender发往Kafka前,还会保存到InFlightRequests中,其主要作⽤是缓存已经发出去但还没收到相应的请求,也是以Node分组。
每个连接最⼤缓存未响应的请求数通过max.tion配置(默认5)
元数据的更新
InFlightRequests可以获得leastLoadedNode,即所有Node中负载最⼩的。leastLoadedNode⼀般⽤于元数据请求、消费者组播协议等交互。
当客户端中没有需要使⽤的元数据信息或唱过metadata.max.age.ms没有更新元数据时,就会引起元数据更新操作。
3.重要的⽣产者参数
acks:⽤来指定分区中有多少个副本收到这条消息,⽣产者才认为写⼊成功(默认”1")
“1":leader写⼊即成功、“0”:不需要等待服务端相应、”-1”/“all":ISR所有副本都写⼊才收到响应
retries、retry.backoff.ms:⽣产者重试次数(默认0)和两次重试之间的间隔(默认100)
connections.max.idle.ms:多久后关闭闲置的连接(默认540000,9分钟)
linger.ms:⽣产者发送ProducerBatch等待更多消息加⼊的时间(默认为0)
receive.buffer.bytes:Socket接收消息缓冲区的⼤⼩(默认32768,32k)
nd.buffer.bytes:Socket发送消息缓冲区的⼤⼩(默认131072,128k)
request.timeout.ms:Producer等待请求响应的最长时间(默认30000ms),这个值需要⽐broker参数replica.lag.time.max.ms⼤三、消费者
1.消费者与消费组
每个分区只能被⼀个消费组的⼀个消费者消费
消费者数⼤于分区数时,会有消费者分配不到分区⽽⽆法消费任何消息
消费者并⾮逻辑上的概念,它是实际的应⽤实例,它可以是⼀个钱程,也可以是⼀个进程。
2.客户端开发
消费步骤
配置消费者客户端参数及创建KafkaConsumer实例
订阅主题
拉取消息并消费
提交消费位移
关闭实例
必要的参数配置
bootstrap.rvers:集群broker地址清单
group.id:消费组名称
key.derializer、value.derializer`:反序列化器
订阅主题和分区
subscribe():订阅主题
assign():订阅指定主题分区
通过partitionFor()⽅法先获取分区列表
unsubscribe():取消订阅
消息消费
poll():返回的是所订阅的主题(分区)上的⼀组消息,可设定timeout参数来控制阻塞时间位移提交
提交的offt为 lastConsumedOfft + 1
lastConsumedOfft:上⼀次poll拉取到的最后⼀条消息的offt
控制或关闭消费
pau()、resume():暂停和恢复某分区的消费
指定位移消费
ek():指定offt消费
beginingOffts(),endOfftes(),offstesForTimes():获取开头、末尾或指定时间的offt ekToBeginning、ekToEnd():从开头、末尾开始消费
再均衡
在subcribe()时,可以注册⼀个实现ConsumerRebalanceListener接⼝的监听器
onPartionsRevoked():消费者停⽌读取消息之后,再均衡开始之前
onPartitionsAssigned():重新分配分区后,开始读取消费前
拦截器
实现ConsumerInterceptor接⼝
poll()返回之前,会调⽤onConsume()⽅法,提交完offt后会调⽤onCommit()⽅法

本文发布于:2023-05-09 20:52:43,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/875907.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   分区   发送   副本   产者   消费   消费者
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图