springkafka 配置优化
⽣产者配置
下⾯⽤的是 -clients-2.0.1.jar 的原⽣配置# ⽤来初始化连接kafka (不⽤配置全部节点,会动态发现)bootstrap .rvers =172.28.15.138:9092,172.28.15.195:9092,172.28.15.145:9092# ⽣产者缓存发送记录,如果⽣产速度过快,buffer 占满,则⽣产者会阻塞 max .block .ms 直到抛出异常。并且不是全部⽤来缓存记录,⽐如压缩也会⽤到 默认 buffer .memory =# 每次间隔固定时间 元数据(节点 topic 分区等)主动更新,默认值 5 * 60 * 1000 (单位毫秒)metadata .max .age .ms =# 设置⼤于零的值将导致客户端重新发送其发送失败并带有潜在的瞬时错误的任何记录,请注意,此重试与客户端在收到错误后重新发送记录没有什么不同。如retries =0# ⽣产者要求领导者在确认请求完成之前已收到的确认数 默认=1## 0:这意味着⽣产者producer 不等待来⾃broker 同步完成的确认继续发送下⼀条(批)消息。此选项提供最低的延迟但最弱的耐久性保证(当服务器发⽣故障## 1:这意味着领导者会将记录写到其本地⽇志中,但是会在没有等待所有跟随者的完全确认的情况下做出响应。在这种情况下,如果领导者在确认记录后⽴即## all 或者-1:这意味着领导者将等待整套同步副本来确认记录。这保证了只要⾄少⼀个同步副本保持活动状态,记录就不会丢失。这是最有⼒的保证。这等效于acks =1#⽣产者⽣成的所有数据的压缩类型。默认值为⽆(即不压缩)。有效的值为<code > none </ code >,<code > gzip </ code >,<code > snappy </ code compression .type =none #每当将多个记录发送到同⼀分区时,⽣产者将尝
试将记录⼀起批处理成更少的请求。这有助于提⾼客户端和服务器的性能。此配置控制默认批处理⼤⼩(以字节batch .size =16384#⽣产者将在请求传输之间到达的所有记录归为⼀个批处理的请求。 通常,只有在记录到达速度快于记录发送速度时,这种情况才会发⽣。但是,在某些情况下linger .ms =0# 发出请求时传递给服务器的ID 字符串。其⽬的是通过允许将逻辑应⽤程序名称包含在服务器端请求⽇志中,从⽽能够跟踪IP /端⼝以外的请求源。默认client .id =# The size of the TCP nd buffer (SO_SNDBUF ) to u when nding data . If the value is -1, the OS default will be ud . 默认 128 * 1024nd .buffer .bytes =# The size of the TCP receive buffer (SO_RCVBUF ) to u when reading data . If the value is -1, the OS default will be ud . 默认32 * 1024receive .buffer .bytes =# 单个请求最⼤字节数. This tting will limit the number of records the producer will nd in a single request to avoid nding huge requests .This is also eff max .request .size = # 客户端对broker 发起重连的间隔 默认 50 毫秒。reconnect .backoff .ms =# 最⼤重连间隔 默认1000 。如果 每次链接都失败,客户端会指数的增加连接时间,直到达到最⼤值(reconnect .backoff .max .ms )reconnect .backoff .max .ms =# 尝试重试对给定topic 分区的失败请求之前要等待的时间。在某些故障情况下,这避免了在紧密循环中重复发送请求。默认 100retry .backoff .ms =# 该配置控制<code > KafkaProducer .nd ()</ code >和<code > KafkaProducer .partitionsFor ()</ code >的阻塞时间。这些⽅法可以被阻塞,因为缓冲区已max .block .ms =# 该配置控制客户机等待请求响应的最长时间。如果在超时之前仍未收到响应,则客户端将在必要时重新发送请求,或者在⽤尽重试后使请求失败。该值应⼤于
request .timeout .ms =# 每次间隔固定时间 元数据(节点 topic 分区等)主动更新,默认值 5 * 60 * 1000 (单位毫秒)metadata .max .age .ms =# 计算指标样本的时间窗⼝。默认 30000metrics .sample .window .ms =# 维护以计算指标的样本数 默认 2metrics .num .samples =....省略了其他 metrics 参数# The maximum number of unacknowledged requests the client will nd on a single connection before blocking . 默认 5max .in .flight .requests .per .connection =5# Serializer class for key or valie that implements the <code >org .apache .kafka .common .rialization .Serializer </code > interface .key .rializer =value .rializer =# Partitioner class that implements the <code >org .apache .kafka .clients .producer .Partitioner </code > interface . 默认 DefaultPartitioner .class partitioner .class # Clo idle connections after the number of milliconds specified by this config . 默认 9 * 60 * 1000 broker 默认超时是10分钟,这⾥设置⽐他⼩⼀点connections .max .idle .ms =# 配置⽣产者拦截器,在记录发送到kafka 之前可以拦截或者改变消息记录 Implementing the <code >org .apache .kafka .clients .producer .ProducerInterceptor interceptor .class =# 配置通信协议 默认 PLAINTEXT 。 PLAINTEXT (未认证 未加密)SSL (加密) SASL_PLAINTEXT (认证 未加密) SASL_SSL (认证,加密)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
trims26
27
28
benjamin disraeli29
30
31
32
33
34
35
36
37
38
39
40
41
aholic42
43
44
45
46
精巧的意思
47
48
49
50
51
52
53
54
55
56
57
58
消费者配置
下⾯⽤的是 kafka-clients-2.0.1.jar 的原⽣配置# 配置通信协议 默认 PLAINTEXT 。 PLAINTEXT (未认证 未加密)SSL (加密) SASL_PLAINTEXT (认证 未加密) SASL_SSL (认证,加密)curity .protocol =# 发送幂等性 默认 fal 。 true : the producer will ensure that exactly one copy of each message is written in the stream 。如果设置为true ,则需要满⾜ ma enable .idempotence =# 事务协调器在主动中⽌正在进⾏的事务之前等待⽣产者更新事务状态的最长时间(毫秒)。如果此值⼤于broker 中的transaction .max .timeout .ms 设置,该请求transaction .timeout .ms =60000# ⽤于事务传递的TransactionalId 。这使跨多个poducer 会话的可靠性语义成为可能,因为它允许客户端保证使⽤相同TransactionalId 的事务在开始任何新事务之
transactional .id =null 58
59
60
61
62
63
卡塔尔公主
64
65
66# ⽤来初始化连接kafka (不⽤配置全部节点,会动态发现)bootstrap .rvers =172.28.15.138:9092,172.28.15.195:9092,172.28.15.145:9092# 配置topic 的消费组,同⼀个组的多个消费者并发消费⼀个topic 。下⾯两种情需要设置 group management functionality by using <code >subscribe (topic group .id =# 使⽤Kafka 的组管理⼯具时,超时⽤于检测⽤户故障。消费者定期向代理发送⼼跳以指⽰其“状态。如果broker 在此会话超时之前未收到任何⼼跳信号,那么bro ssion .timeout .ms =10000# 使⽤Kafka 的群组管理⼯具时,预期的两次⼼跳到消费者协调员之间的时间。⼼跳⽤于确保消费者会话保持活动状态,并在新消费者加⼊或离开该群组时促进平heartbeat .interval .ms =3000# 分区设置策略类,客户端使⽤改配置类在多个消费者实例之间分发分区所有权 默认 RangeAssignor .class partition .assignment .strategy # 每次间隔固定时间 元数据(节点 topic 分区等)主动更新,默认值 5 * 60 * 1000 (单位毫秒)metadata .max .age .ms =# If true the consumer's offt will be periodically committed in the background . 默认true enable .auto .commit =true # 后台⾃动提交的间隔 默认5000auto .commit .interval .ms =5000# 发出请求时传递给服务器的I
D 字符串。其⽬的是通过允许将逻辑应⽤程序名称包含在服务器端请求⽇志中,从⽽能够跟踪IP /端⼝以外的请求源。默认client .id =# 服务器将返回的每个分区的最⼤数据量。记录由消费者分批提取。如果提取的第⼀个⾮空分区中的第⼀个记录批次⼤于此限制,则仍将返回批次,以确保使⽤max .partition .fetch .bytes =# The size of the TCP nd buffer (SO_SNDBUF ) to u when nding data . If the value is -1, the OS default will be ud . 默认 128 * 1024nd .buffer .bytes =# The size of the TCP receive buffer (SO_RCVBUF ) to u when reading data . If the value is -1, the OS default will be ud . 默认 64 * 1024receive .buffer .bytes =# The minimum amount of data the rver should return for a fetch request .如果没有⾜够的数据,请求将在等待请求之前等待该数据积累。默认设置为fetch .min .bytes =1# The maximum amount of data the rver should return for a fetch request 。记录由使⽤者分批获取,并且如果读取的第⼀个⾮空分区中的第⼀个记录批⼤于fetch .max .bytes = 50 * 1024 * 1024# 如果没有⾜够的数据⽴即满⾜fetch .min .bytes 给出的要求,则服务器在回答提取请求之前将阻塞的最长时间。默认 500fetch .max .wait .ms =500# 客户端对broker 发起重连的间隔 默认 50 毫秒。reconnect .backoff .ms =# 最⼤重连间隔 默认1000 。如果 每次链接都失败,客户端会指数的增加连接时间,直到达到最⼤值(reconnect .backoff .max .ms )reconnect .backoff .max .ms =# 尝试重试对给定topic 分区的失败请求之前要等待的时间。在某些故障情况下,这避免了在紧密循环中重复发送请求。默认 100retry .backoff .ms =# 当Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时(例如因为该数据已被删除),该怎么办:<ul > <li >earliest :⾃动将偏移量重置为最早
的偏移量auto .offt .ret =latest # ⾃动检查消耗的记录的CRC32。这样可以确保不会发⽣在线或磁盘损坏消息的情况。此检查会增加⼀些开销,因此在寻求极端性能的情况下可能会禁⽤此检查check .crcs =true # 计算指标样本的时间窗⼝。默认 30000metrics .sample .window .ms =# 维护以计算指标的样本数 默认 2metrics .num .samples =....省略了其他 metrics 参数# Derializer class for key or value that implements the <code >org .apache .kafka .common .rialization .Derializer </code > interface .key .derializer =value .derializer =# 该配置控制客户机等待请求响应的最长时间。如果在超时之前仍未收到响应,则客户端将在必要时重新发送请求,或者在⽤尽重试后使请求失败。该值应⼤于
request .timeout .ms =30 * 10001
2
3
4
5
6
7
ngo是什么意思8
9
10
11
gridlayout12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
spring rain实践
kafka 连接超时导致服务不能启动
错误内容如下图:
TimeoutException: Timeout expired while fetching topic metadata Kafka 解决⽅案:
超时可能是当时kafka ⽐较繁忙,处理不过来
sumer.ssion.timeout=15000 (默认 10s
sumer.heartbeatInterval=5000 (原来 3s)
参考
1. /p/c575c7aec4dd request .timeout .ms =30 * 1000# 为阻塞功能的consum
er APIs 设置⼀个统⼀的默认的超时时间,避免单个接⼝⼀个参数 默认 60 * 1000default .api .timeout .ms =60 * 1000# Clo idle connections after the number of milliconds specified by this config . 默认 9 * 60 * 1000 broker 默认超时是10分钟,这⾥设置⽐他⼩⼀点connections .max .idle .ms =9 * 60 * 1000# 配置消费者拦截器,业务处理之前可以拦截或者改变消息记录,Implementing the <code >org .apache .kafka .clients .consumer .ConsumerInterceptor interceptor .class =# poll () 函数返回的最⼤记录 默认 500max .poll .records =500# 使⽤消费组管理功能时候,poll ()的最⼤调⽤间隔。这为获取更多记录之前使⽤者可以闲置的时间量设置了⼀个上限。如果在此超时到期前未调⽤poll (),则认max .poll .interval .ms =300000# 内部topics (如offts )对消费者隐藏 默认 true exclude .internal .topics =true # 控制事物消息读取的隔离级别。read_committed : consumer .poll ()只返回已经提交的记录,消息按照offt order 返回 。 read_uncommitted :返回所有消息
isolation .level =read_uncommitted # 配置通信协议 默认 PLAINTEXT 。 PLAINTEXT (未认证 未加密)SSL (加密) SASL_PLAINTEXT (认证 未加密) SASL_SSL (认证,加密)curity .protocol =51
52
53
world cup
54
55
56
有钱能使鬼推磨英语
57
58
59
60
61
62
63
64
65
66
67
68