4.深⼊理解kafka:核⼼设计与实战原理
⽬录
⽇志存储
⽂件⽬录布局
Kafka 中的消息是以主题为基本单位进⾏归类的,各个主题在逻辑上相互独⽴。每个主题⼜可以分为⼀个或多个分区,分区的数量可以在主题创建的时候指定,也可以在之后修改。每条消息在发送的时候会根据分区规则被追加到指定的分区中,分区中的每条消息都会被分配⼀个唯⼀的序列号,也就是通常所说的偏移量(offt),具有4个分区的主题的逻辑结构见图1-2。如果分区规则设置得合理,那么所有的消息可以均匀地分布到不同的分区中,这样就可以实现⽔平扩展。不考虑多副本的情况,⼀个分区对应⼀个⽇志(Log)。为了防⽌ Log 过⼤,Kafka⼜引⼊了⽇志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于⼀个巨型⽂件被平均分配为多个相对较⼩的⽂件,这样也便于消息的维护和清理。事实上,Log 和LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以⽂件夹的形式存储,⽽每个LogSegment 对应于磁盘上的⼀个⽇志⽂件和两个索引⽂件,以及可能的其他⽂件(⽐如以“.txnindex”为后缀的事务索引⽂件)。图4-1描绘了主题、分区与副本之间的关系,在图5-1中⼜补充了Log和LogSegment的关系。
Log对应了⼀个命名形式为<topic>-<partition>的⽂件夹 ,在4.1.1节中我们知道Log对应了⼀个命名形式为<topic>-<partition>的⽂件夹。举个例⼦,假设有⼀个名为“topic-log”的主题,此主题中具有 4 个分区,那么在实际物理存储上表现为“topic-log-
0”“topic-log-1”“topic-log-2”“topic-log-3”这4个⽂件夹:
向Log 中追加消息时是顺序写⼊的,只有最后⼀个 LogSegment 才能执⾏写⼊操作,在此之前所有的 LogSegment 都不能写⼊数据。为了⽅便描述,我们将最后⼀个 LogSegment 称为“activeSegment”,即表⽰当前活跃的⽇志分段。随着消息的不断写⼊,当activeSegment满⾜⼀定的条件时,就需要创
建新的activeSegment,之后追加的消息将写⼊新的activeSegment。为了便于消息的检索,每个LogSegment中的⽇志⽂件(以“.log”为⽂件后缀)都有对应的两个索引⽂件:偏移量索引⽂件(以“.index”为⽂件后缀)和时间戳索引⽂件(以“.timeindex”为⽂件后缀)
⽇志索引
每个⽇志分段⽂件对应了两个索引⽂件,主要⽤来提⾼查找消息的效率。偏移量索引⽂件⽤来建⽴消
息偏移量(offt)到物理地址之间的映射关系,⽅便快速定位消息所在的物理⽂件位置;时间戳索引⽂件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。
Kafka 中的索引⽂件以稀疏索引(spar index)的⽅式构造消息的索引,它并不保证每个消息在索引⽂件中都有对应的索引项。每当写⼊⼀定量(由 broker 端参数 log.index.interval.bytes指定,默认值为4096,即4KB)的消息时,偏移量索引⽂件和时间戳索引⽂件分别增加⼀个偏移量索引项和时间戳索引项,增⼤或减⼩log.index.interval.bytes的值,对应地可以增加或缩⼩索引项的密度。
⽇志分段⽂件达到⼀定的条件时需要进⾏切分,那么其对应的索引⽂件也需要进⾏切分。⽇志分段⽂件切分包含以下⼏个条件,满⾜其⼀即可
(1)当前⽇志分段⽂件的⼤⼩超过了 broker 端参数 bytes 配置的值。bytes参数的默认值为1073741824,即1GB。
(2)当前⽇志分段中消息的最⼤时间戳与当前系统的时间戳的差值⼤于 ll.ms或ll.hours参数配置的值。如果同时配置了ll.ms和ll.hours参数,那么ll.ms的优先级⾼。默认情况下,只配置了ll.hours参数,其值为168,即7天。(3)偏移量索引⽂件或时间戳索引⽂件的⼤⼩达到broker端参数log.index.size.max.bytes配置的值。log.index.size.max.bytes的默认值为10485760,即10MB。
yell是什么意思
(4)追加的消息的偏移量与当前⽇志分段的偏移量之间的差值⼤于Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量(offt-baOfft>Integer.MAX_VALUE)。
偏移量索引
偏移量索引项的格式如图5-8所⽰。每个索引项占⽤8个字节,分为两个部分。(1)relativeOfft:相对偏移量,表⽰消息相对于baOfft 的偏移量,占⽤4 个字节,当前索引⽂件的⽂件名即为baOfft的值。(2)position:物理地址,也就是消息在⽇志分段⽂件中对应的物理位置,占⽤4个字节。
会计制度设计
时间戳索引
声乐发声练习每个索引项占⽤12个字节,分为两个部分。(1)timestamp:当前⽇志分段最⼤的时间戳。(2)relativeOfft:时间戳所对应的消息的相对偏移量
⽇志清理
Kafka提供了两种⽇志清理策略。
(1)⽇志删除(Log Retention):按照⼀定的保留策略直接删除不符合条件的⽇志分段。
(2)⽇志压缩(Log Compaction):针对每个消息的key进⾏整合,对于有相同key的不同value值,只保留最后⼀个版本。
我们可以通过broker端参数log.cleanup.policy来设置⽇志清理策略,此参数的默认值为“delete”,即采⽤⽇志删除的清理策略。
给领导打电话的技巧如果要采⽤⽇志压缩的清理策略,就需要将log.cleanup.policy设置为“compact”,并且还需要将able(默认值为true)设定为true。通过将log.cleanup.policy参数设置为“delete,compact”,还可以同时⽀持⽇志删除和⽇志压缩两种策略。
⽇志清理的粒度可以控制到主题级别,⽐如与log.cleanup.policy对应的主题级别的参数为 cleanup.policy,
⽇志删除
在Kafka的⽇志管理器中会有⼀个专门的⽇志删除任务来周期性地检测和删除不符合保留条件的⽇志分段⽂件,这个周期可以通过broker端参数ion.check.interval.ms来配置,默认值为300000,即5分钟。当前⽇志分段的保留策略有3种:基于时间的保留策略、基于⽇志⼤⼩的保留策略和基于⽇志起始偏移量的保留策略。
①基于时间:⽇志删除任务会检查当前⽇志⽂件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的⽇志分段⽂件集合(deletableSegments).retentionMs可以通过broker端参数ion.hours、ion.minutes和ion.ms来配置,其中ion.ms 的优先级最⾼,ion.minutes 次之,ion.hours最低。默认情况下只配置了
②基于⽇志⼤⼩:⽇志删除任务会检查当前⽇志的⼤⼩是否超过设定的阈值(retentionSize)来寻找可删除的⽇志分段的⽂件集合(deletableSegments),如图5-14所⽰。retentionSize可以通过broker端参数ion.bytes来配置,默认值为-1,表⽰⽆穷⼤。注意ion.bytes配置的是Log中所有⽇志⽂件的总⼤⼩,⽽不是单个⽇志分段(确切地说应该为.log⽇志⽂件)的⼤⼩。单个⽇志分段的⼤⼩由 broker 端参数 bytes 来限制,默认值为1073741824,即1GB
③3.基于⽇志起始偏移量:
spotless⽇志压缩
ipo是啥意思为了防⽌⽇志不必要的频繁清理操作,Kafka 还使⽤了参数log.cleaner.min.cleanable.ratio(默认值为0.5)来限定可进⾏清理操作的最⼩污浊率。Kafka中⽤于保存消费者消费位移的主题__consumer_offts使⽤的就是Log Compaction策略。
磁盘存储
Kafka 依赖于⽂件系统(更底层地来说就是磁盘)来存储和缓存消息。在我们的印象中,对于各个存储介质的速度认知⼤体同图5-20所⽰的相同,层级越⾼代表速度越快。很显然,磁盘处于⼀个⽐较尴尬的位置,这不禁让我们怀疑Kafka 采⽤这种持久化形式能否提供有竞争⼒的性能。在传统的消息中间件 RabbitMQ 中,就使⽤内存作为默认的存储介质,⽽磁盘作为备选介质,以此实现⾼吞吐和低延迟的特性。然⽽,事实上磁盘可以⽐我们预想的要快,也可能⽐我们预想的要慢,这完全取决于我们如何使⽤它。
页缓存
页缓存是操作系统实现的⼀种主要的磁盘缓存,以此⽤来减少对磁盘 I/O 的操作。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问冬天服装搭配
零拷贝
除了消息顺序追加、页缓存等技术,Kafka还使⽤零拷贝(Zero-Copy)技术来进⼀步提升性能。所谓的零拷贝是指将数据直接从磁盘⽂件复制到⽹卡设备中,⽽不需要经由应⽤程序之⼿。零拷贝⼤⼤提⾼了应⽤程序的性能,减少了内核和⽤户模式之间的上下⽂切换。
misono深⼊服务端
协议设计
Kafka⾃定义了⼀组基于TCP的⼆进制协议,只要遵守这组协议的格式,就可以向Kafka发送消息,也可以从Kafka中拉取消息,或者做⼀些其他的事情,⽐如提交消费位移等。在⽬前的 Kafka 2.0.0 中,⼀共包含了 43 种协议类型,每种协议类型都有对应的请求(Request)和响应(Respon),它们都遵守特定的协议模式。每种类型的Request都包含相同结构的协议请求头(RequestHeader)和不同结构的协议请求体(RequestBody
时间轮
Kafka中存在⼤量的延时操作,⽐如延时⽣产、延时拉取和延时删除等。Kafka并没有使⽤JDK⾃带的Timer或DelayQueue来实现延时的功能,⽽是基于时间轮的概念⾃定义实现了⼀个⽤于延时功能的定时器(SystemTimer)
Kafka中的时间轮(TimingWheel)是⼀个存储定时任务的环形队列,底层采⽤数组实现,数组中的每个元素可以存放⼀个定时任务列表(TimerTaskList)。TimerTaskList是⼀个环形的双向链表,链表中的每⼀项表⽰的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)
延时操作
如果在使⽤⽣产者客户端发送消息的时候将 acks 参数设置为-1,那么就意味着需要等待ISR集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。如果在⼀定的时间内,follower1副本或follower2副本没能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。⽣产请求的超时时间由参数request.timeout.ms配置,默认值为30000,即30s。
延时操作创建之后会被加⼊延时操作管理器(DelayedOperationPurgatory)来做专门的处理。延时操作有可能会超时,每个延时操作管理器都会配备⼀个定时器(SystemTimer)来做超时管理,定时器的底层就是采⽤时间轮(TimingWheel)实现的
题外话:在Kafka中将延时操作管理器称为DelayedOperationPurgatory,这个名称⽐之前提及的ExpiredOperationReaper和SkimpyOfftMap的取名更有意思。Purgatory直译为“炼狱”,但丁的《神曲》中有炼狱的相关描述。炼狱共有9层,在⽣前犯有罪过但可以得到宽恕的灵魂,按照⼈类的七宗罪(傲慢、忌妒、愤怒、怠惰、贪财、贪⾷、贪⾊)分别在这⾥修炼洗涤,⽽后⼀层层升向光明和天堂。Kafka 中采⽤这⼀称谓,将延时操作看作需要被洗涤的灵魂,在炼狱中慢慢修炼,等待解脱升⼊天堂(即完成延时操作)。
下图描绘了客户端在请求写⼊消息到收到响应结果的过程中与延时⽣产操作相关的细节,在了解相关的概念之后应该⽐较容易理解:如果客户端设置的 acks 参数不为-1,或者没有成功的消息写⼊,那么就直接返回结果给客户端,否则就需要创建延时⽣产操作并存⼊延时操作管理器,最终要么由外部事件触发,要么由超时触发⽽执⾏。
有延时⽣产就有延时拉取txx
Kafka在处理拉取请求时,会先读取⼀次⽇志⽂件,如果收集不到⾜够多(fetchMinBytes,由参数fetch.min.bytes配置,默认值为1)的消息,那么就会创建⼀个延时拉取操作(DelayedFetch)以等待拉取到⾜够数量的消息。当延时拉取操作执⾏时,会再读取⼀次⽇志⽂件,然后将拉取结果返回给 follower 副本。延时拉取操作也会有⼀个专门的延时操作管理器负责管理,⼤体的脉络与延时⽣产操作相同,
控制器
摘要是什么意思在 Kafka 集群中会有⼀个或多个 broker,其中有⼀个 broker 会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发⽣变化时,由控制器负责通知所有broker更新其元数据信息。当使⽤kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
控制器的选举及异常恢复
Kafka中的控制器选举⼯作依赖于ZooKeeper,成功竞选为控制器的broker会在ZooKeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下: