librdkafka介绍⽂档ntroduction to librdkafka - the Apache Kafka C/C++ client library librdkafka 是⼀个C实现的⾼性能 Apache Kafka 客户端,为⽣产环境提供了⼀个可靠和⾼性能的客户端。librdkafka 同样也提供了传统的 C++ 接⼝。
⽬录
怎样自学英语
以下⽬录适⽤于本⽂
性能
性能数据
⾼吞吐
低延时
压缩
牙盘
消息可靠性
⽤法
⽂档
初始化
配置
线程和回调
Brokers
Producer API
Consumer API
附录
测试详情
性能
librdkafka 是⼀个基于现代硬件设计的多线程库, 并且试图保持最少的内存拷贝。
如果应⽤程序愿意,⽣产和消费消息的载体可以不通过任何拷贝实现让消息⼤⼩不受限制。
librdkafka 同样适⽤于⾼吞吐还是低延时的场景,都可以通过属性配置接⼝来满⾜。
下⾯是两个对于性能调节⾮常重要的属性:
ssages - 发送消息集前,本地队列等待累计的最⼩消息数量。
queue.buffering.max.ms - 等待 ssages 数量消息填充本地队列的最长等待时间。
性能数据
接下来的性能测试数据受限于如下配置:
Intel Quad Core i7 at 3.4GHz, 8GB of memory
影响磁盘性能的 brokers 刷新参数配置如下:
log.ssages=10000000
log.flush.interval.ms=100000
两个 brokers 运⾏在和 librdkafka 的同⼀台机器上。
每个 topic 有两个 partition。
中国十大王朝
每个 broker 被⼀个 partition 领导。
使⽤ examples ⼦⽬录下 rdkafka_performance 程序。
测试结果
Test1: 2 brokers, 2 partitions, required.acks=2, 100 byte messages:
850000 messages/cond, 85 MB/cond
Test2: 1 broker, 1 partition, required.acks=0, 100 byte messages:
710000 messages/cond, 71 MB/cond
Test3: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
snappy compression:
潘维廉
300000 messages/cond, 30 MB/cond
Test4: 2 broker2, 2 partitions, required.acks=2, 100 byte messages,
gzip compression:
230000 messages/cond, 23 MB/cond
提⽰: 要了解命令的执⾏等,请参考本⽂最后的 测试详情 章节。
提⽰: 消费者的性能测试将会尽快公布。
⾼吞吐
⾼吞吐的关键是消息的批量处理,等待本地队列累计⼀定数量的消息,然后⽤⼀个⼤的消息集或批量发送到对端。
通过这种⽅式补偿通讯开销和消除往返时延(RTT)的不利影响。
默认配置 ssages=1000 和 queue.buffering.max.ms=1000 适⽤于⾼吞吐量场景。
这样配置允许 librdkafka 等待 1000 ms 让本地队列累计 1000 条消息,然后发送累计的消息到 broker。
这些配置是全局的 (rd_kafka_conf_t),但是基于每⼀个 topic+partition 上应⽤。
低延时
当要求发送低延时,“queue.buffering.max.ms”应该调整到尽可能适⽤于⽣产侧低延时。
设置“queue.buffering.max.ms” 为 1 来确保消息尽可能快的发送。
你可以查看来获取更多细节。
压缩
⽣产者消息压缩通过配置“dec”属性⽣效。
压缩通过本地队列批量处理消息实现,批量越⼤越可能获得更⾼的压缩率。
本地批量队列⼤⼩通过“ssages”和“queue.buffering.max.ms”配置属性控制,已经在前⾯的⾼吞吐章节描述过了。消息可靠性
消息可靠性是 librdkafka 的⼀个重要因素。
通过指定的配置(“quired.acks”和“message.ies”,等)应⽤程序完全可以信赖 librdkafka 来分发消息。
如果 topic 配置属性“quired.acks”设置为等待来⾃ broker 的消息提交确认(⾮0,详见),librdkafka 将会等待消直到所有期望的 ack 都收到,并优雅的处理下列事件:
Broker 连接失败
Topic 领导变更
来⾃ broker 的⽣产者错误信号
这些事件都是 librdkafka ⾃动处理的。对与上⾯的事件,应⽤程序⽆须做任何处理。
失败消息将会重新发送“message.ies”次,然后返回失败报告给应⽤程序。
发送失败报告回调函数⽤于 librdkafka 给应⽤程序发送消息返回的状态信号,每个消息发送后的消息状态报告都会调⽤⼀次回调函数:如果error_code是⾮零,那么消息发送失败,error_code 表⽰失败类型(rd_kafka_resp_err_t枚举)。
如果error_code是零,那么消息发送成功。
发送报告回调函数的更多使⽤详情查看⽣产者 API。
发送报告回调函数是可配置的。
⽤法
⽂档
librdkafka API 记录在
头⽂件中,配置属性记录在中。
初始化
应⽤程序需要初始化⼀个顶层对象(rd_kafka_t)的基础容器,⽤于全局配置和共享状态。
通过调⽤rd_kafka_new()创建。
还需要实例化⼀个或多个 topic(rd_kafka_topic_t)⽤于⽣产或消费。
topic 对象保存 topic 级别的属性,并且维护⼀个映射,
该映射保存所有可⽤ partition 和他们的领导 broker 。
珠宝品牌排行榜前十名通过调⽤rd_kafka_topic_new()创建。
rd_kafka_t 和 rd_kafka_topic_t都源于可选的配置 API。
不使⽤该 API 将导致 librdkafka 使⽤列在⽂档CONFIGURATION.md中的默认配置。
提⽰:⼀个应⽤程序可以创建多个rd_kafka_t对象,它们不共享状态。
提⽰:⼀个rd_kafka_topic_t对象只能⽤于⼀个rd_kafka_t对象的创建。
配置
为了与官⽅的 Apache Kafka 软件⼀致和降低学习门槛,
librdkafka 使⽤了和 Apache Kafka 官⽅客户端完全⼀致的配置属性。
在创建对象之前,通过rd_kafka_conf_t() 和 rd_kafka_topic_conf_t() API
来应⽤配置。
提⽰:⼀旦通过rd_kafka.._new()使⽤过,rd_kafka.._conf_t 对象不能再重复使⽤。
调⽤rd_kafka.._new()后,应⽤程序不需要释放任何配置资源。
⽰例
rd_kafka_conf_t *conf;
char errstr[512];
conf = rd_kafka_conf_new();
rd_kafka_conf_t(conf, "dec", "snappy", errstr, sizeof(errstr));
rd_kafka_conf_t(conf, "ssages", "100", errstr, sizeof(errstr));
rd_kafka_new(RD_KAFKA_PRODUCER, conf);
1
养生按摩
2
3
4
5
6
7
8
线程和回调
为了完全利⽤的现代硬件,librdkafka 本⾝⽀持多线程。
API 是完全线程安全。在任何时间和其任何线程中,应⽤程序都可以调⽤任何 API 函数。
⼀个基于调查的 API 被⽤于向应⽤程序返回信号。
应⽤程序需要定期调⽤rd_kafka_poll()。
这个调查 API 将调⽤以下配置的回调函数(可选):
消息发送报告回调函数 - 标⽰⼀个消息被发送或发送失败,应⽤程序可以做出处理货释放消息内使⽤应⽤资源。
错误回调函数 - 标⽰⼀个错误。这些错误通常是⼀个信息类别,⽐如连接 broker 错误,
应⽤程序通常不需要做任何处理。错误的类型在 rd_kafka_resp_err_t 枚举中指定,
包括远程 broker 错误和本地错误。
可选的回调函数不是通过调查出发,它们可能被任何线程调⽤:
⽇志回调函数 - 允许应⽤程序输出 librdkafka ⽣成的⽇志消息。
分区回调函数 - 应⽤程序提供消息分区的⽅法。在任何时候任何线程中,分区函数都可能被调⽤,
并可能被使⽤同⼀个键值调⽤多次。
分区函数约束:
不得调⽤任何 rd_kafka_*() 类型函数
不得阻塞或执⾏长时间的函数
必须返回⼀个 0 到 partition_cnt-1 之间的值,
或分区⽆法执⾏时返回特殊的 RD_KAFKA_PARTITION_UA 值
Brokers
librdkafka 只需要初始化⼀个 broker 列表(⾄少⼀个)来调⽤ broker 引导。
librdkafka 会连接所有列在“metadata.broker.list”属性中或调⽤rd_kafka_brokers_add()添加的 broker 引导,并且查询列表中每⼀的元数据信息,包括 broker、topic、partition 和 kafka 集群的领导者。
Broker 名字类似于“host[:port]”,其中端⼝是可选的(默认 9092),host是可⽤的主机名或IPv4、IPv6地址。
如果是⼀个复杂地址,librdkafka会循环地址尝试连接。
⼀个 DNS 记录⽤于包含所有可⽤于引导的 broker 地址。
新特性
Apache Kafka broker 版本 0.10.0 新增了⼀个 ApiVersionRequest API,允许客户端查询 broker ⽀持的 API 版本。
librdkafka ⽀持这个特性,会查询每⼀个 broker 获取该信息(如果quest=true),根据该信息⽣效或失效各种特性,如MessageVersion 1 (timestamps), KafkaConsumer等。
如果 broker 没有对 librdkafka 的 ApiVersionRequest 请求作出正确响应,会认为 broker 版本太⽼不⽀持该 API,并回退到⽼版本broker 的 API。回退的版本可配置到 librdkafka 中,通过broker.version.fallback属性控制。
Producer API
通过RD_KAFKA_PRODUCER类型设置好rd_kafka_t对象后,⼀个或多个rd_kafka_topic_t对象就准备好接收消息,并组装和发送到broker。
rd_kafka_produce()函数接受如下参数:
rkt - ⽣产的 topic,之前通过rd_kafka_topic_new()⽣成
partition - ⽣产的 partition。如果设置为RD_KAFKA_PARTITION_UA(未赋值的),需要通过配置分区函数去选择⼀个确定partition。
msgflags - 0 或下⾯的值:
RD_KAFKA_MSG_F_COPY - librdkafka 将⽴即从 payload 做⼀份拷贝。如果 payload 是不稳定存储,如栈,需要使⽤这个参数。
RD_KAFKA_MSG_F_FREE - 当 payload 使⽤完后,让 librdkafka 使⽤free(3)释放。
这两个标志互斥,如果都不设置,payload 既不会被拷贝也不会被 librdkafka 释放。
对不起我爱你歌词
如果RD_KAFKA_MSG_F_COPY标志不设置,就会有数据拷贝,librdkafka 将占⽤ payload 指针直到消息被发送或失败。
librdkafka 处理完消息后,会调⽤发送报告回调函数,让应⽤程序重新获取 payload 的所有权。
如果设置了RD_KAFKA_MSG_F_FREE,应⽤程序就不要在发送报告回调函数中释放 payload。
payload,len - 消息 payload
key,keylen - 可选的消息键,⽤于分区。将会⽤于 topic 分区回调函数,如果有,会附加到消息中发送给 broker。
msg_opaque - 可选的,应⽤程序为每个消息提供的⽆类型指针,提供给消息发送回调函数,⽤于应⽤程序引⽤。
rd_kafka_produce() 是⼀个⾮阻塞 API,该函数会将消息塞⼊⼀个内部队列并⽴即返回。
如果队列中的消息数超过queue.ssages属性配置的值,rd_kafka_produce()通过返回 -1,并在ENOBUFS中设置错误码来反馈错误。
提⽰: 见 examples/rdkafka_performance.c 获取⽣产者的使⽤。
Simple Consumer API (legacy)
提⽰:要获取 high-level KafkaConsumer 接⼝,查看 rd_kafka_subscribe (rdkafka.h) 或 KafkaConsumer (rdkafkacpp.h)
消费者 API ⽐⽣产者 API 更复杂。红烧猪蹄做法
通过RD_KAFKA_CONSUMER类型创建rd_kafka_t对象并实例化rd_kafka_topic_t后,应⽤程序还需要调⽤rd_kafka_consume_start()指定partition。
rd_kafka_consume_start()参数:
rkt - 要消费的 topic ,之前通过rd_kafka_topic_new()创建。
partition - 要消费的 partition。
offt - 消费开始的消息偏移量。可以是绝对的值或两中特殊的偏移量:
RD_KAFKA_OFFSET_BEGINNING 从该 partition 的队列的最开始消费(最⽼的消息)。
RD_KAFKA_OFFSET_END 从该 partition 产⽣的下⼀个消息开始消费。
RD_KAFKA_OFFSET_STORED 使⽤偏移量存储。
当⼀个 topic+partition 消费者被启动,librdkafka 不断尝试从 broker 批量获取消息来保持本地队列有ssages数量的消息。
本地消息队列通过 3 个不同的消费 API 向应⽤程序提供服务:
rd_kafka_consume() - 消费⼀个消息
rd_kafka_consume_batch() - 消费⼀个或多个消息
rd_kafka_consume_callback() - 消费本地队列中的所有消息,且每⼀个都调⽤回调函数
这三个 API 的性能按照升序排列,rd_kafka_consume()最慢,rd_kafka_consume_callback()最快。不同的类型满⾜不同的应⽤需要。
被上述函数消费的消息通过rd_kafka_message_t类型返回。