Pulsar简介及Pulsar部署、原理和使⽤介绍
Pulsar简介及Pulsar部署、原理和使⽤介绍
Pulsar简介
诞⽣背景
Apache Pulsar 是⼀个企业级的分布式消息系统,最初由 Yahoo 开发,在 2016 年开源,并于2018年9⽉毕业成为 Apache 基⾦会的顶级项⽬。Pulsar 已经在 Yahoo 的⽣产环境使⽤了三年多,主要服务于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存储)。
Pulsar是⼀种⽤于服务器到服务器消息传递的多租户,⾼性能解决⽅案。Pulsar最初由雅虎公司开发,由负责管理。
Pulsar特点
对Pulsar实例中的多个群集的本机⽀持,跨群集的消息的⽆缝地理复制。
新西兰奥塔哥大学
极低的发布和端到端延迟。
⽆缝扩展到超过⼀百万个主题。
⼀个简单的客户端API,包含Java,Go,Python和C ++的绑定。
主题的多种订阅模式(独占,共享和故障转移)。
主持稿串词
Apache BookKeeper提供的持久消息存储保证消息传递。
⽆服务器轻量级计算框架Pulsar Functions提供流本地数据处理功能。
基于Pulsar函数构建的⽆服务器连接器框架Pulsar IO可以更轻松地将数据移⼊和移出Apache Pulsar。
当数据⽼化时,分层存储将数据从热/暖存储卸载到冷/长期存储(例如S3和GCS)。
消息传递
Pulsar基于publish-subscribe(pub-sub)⽣产订阅模式,⽣产者将消息发布到Topic,消费者可以订阅这些主题来处理消息,并在处理完成后发送确认消息
关键词词解
背包简笔画value Pulsar中的数据存储为byte
key消息可以被Key打标签。这对诸如topic压缩之类的事情有作⽤
properties⼀个可以选择的map,⽤于⽤户配置参数
Sequence
ID
每条消息都放在对应Topic的有序序列中,这个字段记录的是消息所在的序列顺序
publish
time
消息的发布时间,消息发布的时间戳(producer⾃动附上)
event time 应⽤程序可以附加到消息上的时间戳,⽤户业务需求的时间标注,可选的时间戳,应⽤可以附在消息上,代表某个事件发⽣的时间,例如,消
西藏歌曲息被处理时间。如果没有明确的设置,那么事件时间为0。
消息⽣产者
发送模式
⽣产者将消息发布到Topic上,发送消息分可为同步发送和异步发送两种模式:
Mode说明:
同步发
送
产者每次发送完消息后需要等到broker的ack确认,如果没有接收到确认信息⽣产者就认为发送消息失败
异步发送⽣产者将消息放⼊到阻塞队列中就直接返回。Pulsar的客户端通过后台线程将消息发送给broker,如果队列满了,⽣产者再放⼊消息时会被告知
推送失败
Mode说明:
消息压缩
⽣产者发布消息在传输过程中会对数据进⾏压缩,⽬前Pulsar⽀持的压缩⽅式有LZ4,ZLIB, ZSTD, SNAPPY。如果启⽤了批处理,⽣产者将在单个请求中累积⼀批消息进⾏发送,批处理⼤⼩可以由最⼤消息数和最⼤发布延迟定义
批处理发送(Batching)
如果批处理开启,producer将会累积⼀批消息,然后通过⼀次请求发送出去。批处理的⼤⼩取决于最⼤的消息数量及最⼤的发布延迟。
消息消费者
消费模式
消费者从Topic上接收消息进⾏数据处理,同样,消息接收也分为同步接收和异步接收两种模式:
Mode说明:
同步接收同步接收⼀直处于阻塞状态,直到有消息传⼊
异步接收异步接收⽴即返回⼀个未来future值,⼀旦有新消息,它就直接完成,例如java中的
消费确认(ack)
1. 消费者成功接收到消息时:
当消费者成功处理完⼀条消息后,会发送⼀个确认请求给broker,告诉broker可以删除这条消息了,否则broker会⼀直存储这条消息。消息可以逐个确认也可以累积确认,消费者只需要确认收到的最后⼀条消息,这个流中所涉及到的所有消息都不会再重新传递给这个消费者。
2. 消费者不成功消费时
当消费者处理消息失败时,会给broker发送⼀个失败确认,这个时候broker就会给消费者重新发送这条消息,失败确认可以逐条发送,也可以累积发送,这取决于消费订阅模式。在exclusive和failover订阅模式中,消费者只会对收到的最后⼀条消息进⾏失败确认。在Pulsar客户端可以通过设置timeout的⽅式触发broker⾃动重新传递消息,如果在timeout范围内消费者都没有发送确认请求,那么broker就会⾃动重新发送这条消息给消费者。
3. 确认超时
如果某条消息⼀直处理失败就会触发broker⼀直重发这条消息给消费者,使消费者⽆法处理其他消息,Dead letter topic机制可以让消费者在⽆法成功消费某些消息时接收新的消息进⾏消费,在这种机图书角标语
制下,⽆法消费的消息存储在⼀个单独的topic中(Dead letter topic),⽤户可以决定如何处理这个topic中的消息。
消息的持久化
消息的持久化是通过BookKeeper实现的,⼀旦创建了订阅关系,Pulsar将保留所有的消息(即使消费者断开了链接),只有当消费者确认已经成功处理保留的消息时,才会将这些消息丢弃消息。
消息的保留分为两种:
1.在保留策略内的消息即使消费者已发送了确认也可以持久地存储在Pulsar中,保留策略未涵盖的已确认消息将被删除,如果没有保留策略所有已确认的消息都将被删除;
2.设置消息到期时间,会根据应⽤于namespace的TTL过期时间,如果到期了,即使消息没有被确认也会被删除
当有某条消息被重复发送时,可以选择两种持久化策略:
1.是将重复的消息也持久化到BookKeeper中
2.是判断如果是重复消息,则不再进⾏持久化操作
租户(tenant)
Pulsar 从⼀开始就⽀持多租户,topic 的名称是层级化的,最上层是租户(tenant)
命名空间(namespace)
命名空间是租户内部逻辑上的命名术语。⼀个租户可以通过admin API创建多个命名空间。例如,⼀个对接多个应⽤的租户,可以为每个应⽤创建不同的namespace。
Topic
与其他pub-sub系统⼀样,Pulsar中的topic被命名为从⽣产者向消费者传输消息的通道:
{persistent|non-persistent}://tenant/namespace/topic
关键词词解
persistent|non-persistent
标识topic类型:
持久:所有消息都持久保存在磁盘上(BookKeeper节点)⾮持久:数据只存在内存中,当broker重启后会造成消息丢失
tenant租户;实例中的topic的租户。租户是pulsar对多租户⽀持的重要组成,可以分散在集群中
namespace⽤作topic的分组机制,⼤多数topic配置是在命名空间级别执⾏的,每个tenant租户可以有多个namespace
topic可以⽤户⾃定义,topic的名字是⾃由格式的,在Pulsar实例中没有特殊含义
producer写⼊不存在的主题时 了会在提供的命名空间下⾃动创建该主题
⽤户不需要在Pulsar中明确地创建主题,如果客户端尝试往不存在的主题中写⼊/接收信息,Pulsar将在topic提供的namespace下⾃动创建该主题
消费者可以订阅多个topic:
通过名称配置:persistent://public/default/finance-.*
配置topic订阅列表
常规topic只能由单个broker提供,这限制了topic的最⼤吞吐量,分区topic是由多个broker处理的⼀种特殊类型的topic,它允许更⾼的吞吐量。分区topic和普通topic在订阅模式的⼯作⽅式上没有区别,在创建主题时可以指定分区数。
消息路由模式
发布到分布分区topic主题时,必须指定路由模式。默认三个路由模式,默认轮询-和Kafka类似。
模式描述
RoundRobinPartition如果未提供key,则以轮询⽅式往各分区上⾯发布消息,以实现最⼤吞吐量 --默认模式
SinglePartition 如果未提供key,则⽣产者随机选择⼀个分区并将所有消息发布到此分区。如果指定了key,将对key进⾏散列(默认javaStringHash=多客户端推荐Murmur3_32Hash)并将消息分配给特定的分区
CustomPartition 使⽤将调⽤的⾃定义消息路由器实现来特定消息的分区。⽤户在在java clent端实现MessageRouter接⼝来实现⾃定义路由模
式。
消息订阅模式(subscription)
Pulsar具有exclusive,shared,failover三种订阅模式
独占(exclusive)
exclusive模式:⼀个topic只允许⼀个消费者订阅,否者会报错
在 exclusive 模式下,⼀个 subscription 只允许被⼀个 consumer ⽤于订阅 topic ,如果多个 consumer 使⽤相同的 subscription 去订阅同⼀个 topic,则会发⽣错误。exclusive 是默认的订阅模式。如下图所⽰,Consumer A-0 和 Consumer A-1 都使⽤了相同的subscription(相同的消费组),只有 Consumer A-0 被允许消费消息。
故障转移|灾备(failover)
failover模式:多个消费者订阅同⼀个topic,按照消费者名称进⾏排序,第⼀个消费者时唯⼀接收到消息的消费者(主消费者),当主消费者断开连接时,所有的后续消息都将发给下⼀个消费者
在 failover 模式下,多个 consumer 允许使⽤同⼀个 subscription 去订阅 topic。但是对于给定的 topic,broker 将选择⼀个consumer 作为该 topic 的主 consumer ,其他 consumer 将被指定为故障转移 consumer 。当主 consumer 失去连接时,topic 将被重新分配给其中⼀个故障转移 consumer ,⽽新分配的 consumer 将成为新的主 consumer 。发⽣这种情况时,所有未确认的消息都将传递给新的主 consumer ,这个过程类似于 Kafka 中的 consumer 组重平衡(rebalance)。
如下图所⽰,Consumer B-0 是 topic 的主 consumer ,当 Consumer B-0 失去连接时,Consumer B-1 才能成为新的主 consumer 去消费 topic。
共享(shared)
shared模式:多个消费者订阅同⼀个topic,消息在消费者之间以循环的⽅式发送,并且给定的某条消息只能发送给⼀个消费者,当消费者断开连接时,所有发送给它但没有确认的消息将重新安排发送给其他消费者
在 shared 模式下,多个 consumer 可以使⽤同⼀个 subscription 去订阅 topic。消息以轮询的⽅式分发给 consumer ,并且每条消费仅发送给⼀个 consumer 。当有 consumer 失去连接时,所有发送给该 consumer 但未被确认的消息将被重新安排,以便发送给该subscription 上剩余的 consumer 。
但是消息不能保证有序以及不⽀持批量ack
如下图所⽰,Consumer C-1,Consumer C-2,Consumer C-3 以轮询的⽅式接受消息。
共享键(key_shared)
key_shared模式:多个消费者订阅同⼀个topic,消息以分布⽅式在消费者之间传递(<key, value>),具有相同key的消息传递给同⼀个消费者,当这个消费者断开连接时,将导致key对应的消费者更改
4s店销售在 shared 模式下,多个 consumer 可以使⽤同⼀个 subscription 去订阅 topic。消息按照 key 分发给 consumer ,含有相同 key 的消息只被发送给同⼀个 consumer 。
蜡笔小新怎么画
如下图所⽰,不同的 consumer 只接受到对应 key 的消息。
Pulsar原理架构
体系结构
在最⾼级别中,⼀个Pulsar实例有⼀个或多个Pulsar集群组成,实例中的集群可以彼此复制数据。在Pulsar集群中,⼀个或多个broker处理和加载来⾃⽣产者传⼊的消息,将消息发送给消费者,与Pulsar配置存储通信以处理各种协调任务,Pulsar集群架构如下所⽰,包括⼀个或多个broker,⽤于集群级配置和协调的Zookeeper,⽤于持久存储消息的BookKeeper,集群可以使⽤地理复制在集群间进⾏复制
Pulsar组件
Broker
Pulsar 的 broker 是⼀个⽆状态组件,本⾝不存储数据。主要负责处理 producer 和 consumer 的请求,消息的复制与分发,数据的计算。可以理解成Broker 是 Pulsar 的⾃⾝实例
主要有2部分组成:
1. HTTP服务器,向⽣产者和消费者公开,⽤于管理任务和topic查找端的REST API;
2. 调度程序,异步TCP服务器,通过⽤于所有数据传输的⾃定义⼆进制协议;
每个集群都有⾃⼰的本地Zookeeper⽤于存储集群特定的配置和协调,如所有权元数据、代理加载报告、簿记员分类帐元数据等等。Pulsar使⽤BookKeeper进⾏持久消息存储,BookKeeper是⼀个分布式预写⽇志(WAL)系统,它的优势为:
· 使Pulsar利⽤多个独⽴⽇志,成为ledgers,随着时间推移,可以为topic创建多个ledger
· 为处复制的顺序数据提供了⾮常有效的存储
· 保证在出现各种系统故障时ledger的读取⼀致性
· 提供多个Bookies的I/O分布
· 在容量和吞吐量⽅⾯都是⽔平扩展的,可以通过向集群中添加更多的bookies来增加容量
· Bookies⽤于处理数千个同事读写的ledger,通过使⽤多个磁盘设备(⼀个⽤于⽇志,⼀个⽤于存储),Bookies能够将读写操作的延迟隔离开
· 除了消息数据外,消费者的订阅位置cursor也可以持久地存储在BookKeeper中
每个 topic 的 partition 都会分配到某⼀个 borker 上,producer 和 consumer 则会连接到这个 broker,从⽽向该 topic 的 partition 发送和消费消息。broker 主要负责消息的复制与分发,数据的计算。
zookeeper
主要⽤于存储元数据、集群配置,任务的协调(例如哪个 broker 负责哪个 topic),服务的发现(例如 broker 发现 bookie 的地址)。
bookkeeper
主要⽤于数据的持久化存储。除了消息数据,cursors(游标) 也会被持久化到 Bookeeper,cursors 是消费端订阅消费的位移。Bookeeper 中每⼀个存储节点叫做 bookie。
BookKeeper 是⼀种优化实时⼯作负载的存储服务,具有可扩展、⾼容错、低延迟的特点。企业级的实时存储平台应符合以下⼏项要求:以极低的延迟(⼩于 5 毫秒)读写 entry 流金刚藤的功效与作用
能够持久、⼀致、容错地存储数据
在写数据时,能够进⾏流式传输或追尾传输
有效地存储、访问历史数据与实时数据
数据存储
数据分区
写⼊主题的数据可能只有⼏个MB,也有可能是⼏个TB。所以,在某些情况下主题的吞吐量很低,有时候⼜很⾼,完全取决于消费者的数量。那么碰到有些主题吞吐量很⾼⽽有些⼜很低的情况该怎么处理?为了解决这个问题,Pulsar将⼀个主题的数据分布到多台机器上,也就是所谓的分区。
在处理海量数据时,为了保证⾼吞吐量,分区是⼀种很常见的⼿段。默认情况下,Pulsar的主题是不进⾏分区的,但通过命令⾏⼯具或API 可以很容易地创建分区主题,并指定分区的数量。
在创建好分区主题之后,Pulsar可以⾃动对数据进⾏分区,不会影响到⽣产者和消费者。也就是说,⼀个应⽤程序向⼀个主题写⼊数据,对主题分区之后,不需要修改应⽤程序的代码。分区只是⼀个运维操作,应⽤程序不需要关⼼分区是如何进⾏的。
主题的分区操作由⼀个叫作broker的进程来处理,Pulsar集群⾥的每个节点都会运⾏⾃⼰的broker。
数据持久性