kafka运维命令大全

更新时间:2023-05-12 14:17:17 阅读: 评论:0

kafka运维命令⼤全
转⾃:
⽤于查阅,侵权请联系删除
1.TopicCommand
1.1.Topic创建
bin/kafka-topics.sh --create --bootstrap-rver localhost:9092 --replication-factor 3 --partitions 3 --topic test
相关可选参数
参数描述例⼦
--bootstrap-rver指
定kafka服务
指定连接到的kafka服务; 如果有这个参数,则--zookeeper可以不需要--bootstrap-rver localhost:9092
--zookeeper弃⽤, 通过zk的连接⽅式连接到kafka集群;--zookeeper localhost:2181 或者localhost:2181/kafka
--replication-factor 副本数量,注意不能⼤于broker数量;如果不提供,则会⽤集群中默认
配置
--replication-factor 3
--partitions 分区数量,当创建或者修改topic的时候,⽤这个来指定分区数;如果创
建的时候没有提供参数,则⽤集群中默认值; 注意如果是修改的时候,
分区⽐之前⼩会有问题
--partitions 3
--replica-assignment副本分区分配⽅式;创建topic的时候可以⾃⼰指定副本分配情况;--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表⽰副本
--config<String: name=value>⽤来设置topic级别的配置以覆盖默认配置;只在--create 和--
bootstrap-rver 同时使⽤时候⽣效; 可以配置的参数列表请看⽂末
附件
例如覆盖两个配置--config retention.bytes=123455 --config retention.ms=600001
--command-
config <String: command ⽂件路径>⽤来配置客户端Admin Client启动配置,只在--bootstrap-rver 同
时使⽤时候⽣效;
例如:设置请求的超时时间--command-config config/producer.proterties; 然后在⽂件中配置
request.timeout.ms=300000
1.2.删除Topic
bin/kafka-topics.sh --bootstrap-rver localhost:9092 --delete --topic test
⽀持正则表达式匹配Topic来进⾏删除,只需要将topic ⽤双引号包裹起来例如: 删除以create_topic_byhand_zk为开头的topic;
bin/kafka-topics.sh --bootstrap-rver localhost:9092 --delete --topic "create_topic_byhand_zk.*" .表⽰任意匹配除换⾏符 \n 之外的任何单字符。要匹配 .
,请使⽤ . 。·*·:匹配前⾯的⼦表达式零次或多次。要匹配 * 字符,请使⽤ *。.* : 任意字符
删除任意Topic (慎⽤)
bin/kafka-topics.sh --bootstrap-rver localhost:9092 --delete --topic ".*?"
更多的⽤法请参考正则表达式
1.3.Topic分区扩容
zk⽅式(不推荐)
>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2
kafka版本 >= 2.2 ⽀持下⾯⽅式(推荐)
单个Topic扩容
bin/kafka-topics.sh --bootstrap-rver broker_host:port --alter --topic test_create_topic1 --partitions 4
批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-rver 172.23.248.85:9092 --alter --partitions 4
".*?"正则表达式的意思是匹配所有; 您可按需匹配
PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进⾏;
相关可选参数 | 参数 |描述 |例⼦| |--|--|--| |--replica-assignment|副本分区分配⽅式;创建topic的时候可以⾃⼰指定副本分配情况; |--replica-assignment BrokerId-0:BrokerId-1:BrokerId-2,BrokerId-1:BrokerId-2:BrokerId-0,BrokerId-2:BrokerId-1:BrokerId-0 ; 这个意思是有三个分区和三个副本,对应分配的Broker; 逗号隔开标识分区;冒号隔开表⽰副本|
PS: 虽然这⾥配置的是全部的分区副本分配配置,但是正在⽣效的是新增的分区; ⽐如: 以前3分区1副本是这样的
Broker-1Broker-2Broker-3Broker-4
012
现在新增⼀个分区,--replica-assignment 2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker
Broker-1Broker-2Broker-3Broker-4
1023
但是实际上不会这样做,Controller在处理的时候会把前⾯3个截掉; 只取新增的分区分配⽅式,原来的还是不会变
Broker-1Broker-2Broker-3Broker-4
0123
1.4.查询Topic描述
1.查询单个Topic
sh bin/kafka-topics.sh --topic test --bootstrap-rver xxxx:9092 --describe --exclude-internal
2.批量查询Topic(正则表达式匹配,下⾯是查询所有Topic)
sh bin/kafka-topics.sh --topic ".*?" --bootstrap-rver xxxx:9092 --describe --exclude-internal
⽀持正则表达式匹配Topic,只需要将topic ⽤双引号包裹起来
相关可选参数
参数描述例⼦
--bootstrap-rver指定kafka服务指定连接到的kafka服务; 如果有这个参数,则--zookeeper可以不需要--bootstrap-rver localhost:9092
--at-min-isr-partitions查询的时候省略⼀些计数和配置信息--at-min-isr-partitions
--exclude-internal排除kafka内部topic,⽐如__consumer_offts-*--exclude-internal
--topics-with-overrides仅显⽰已覆盖配置的主题,也就是单独针对Topic设置的配置覆盖默认配置;不展⽰分区信息--topics-with-overrides
5.查询Topic列表
1.查询所有Topic列表
sh bin/kafka-topics.sh --bootstrap-rver xxxxxx:9092 --list --exclude-internal
2.查询匹配Topic列表(正则表达式)
查询test_create_开头的所有Topic列表sh bin/kafka-topics.sh --bootstrap-rver xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"
相关可选参数
参数描述例⼦
--exclude-internal排除kafka内部topic,⽐如__consumer_offts-*--exclude-internal
--topic可以正则表达式进⾏匹配,展⽰topic名称--topic
2.ConfigCommand
Config相关操作; 动态配置可以覆盖默认的静态配置;
2.1 查询配置
Topic配置查询
展⽰关于Topic的动静态配置
1.查询单个Topic配置(只列举动态配置)
sh bin/kafka-configs.sh --describe --bootstrap-rver xxxxx:9092 --topic test_create_topic或者sh bin/kafka-configs.sh --describe --bootstrap-rver 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic
2.查询所有Topic配置(包括内部Topic)(只列举动态配置)
sh bin/kafka-configs.sh --describe --bootstrap-rver 172.23.248.85:9092 --entity-type topics
3.查询Topic的详细配置(动态+静态)
只需要加上⼀个参数--all
其他配置/clients/urs/brokers/broker-loggers 的查询
同理;只需要将--entity-type改成对应的类型就⾏了 (topics/clients/urs/brokers/broker-loggers)
查询kafka版本信息
sh bin/kafka-configs.sh --describe --bootstrap-rver xxxx:9092 --version
所有可配置的动态配置请看最后⾯的 *附件* 部分
2.2 增删改配置 --alter
--alter
删除配置: --delete-config k1=v1,k2=v2 添加/修改配置: --add-config k1,k2 选择类型: --entity-type (topics/clients/urs/brokers/broker- loggers) 类型名称: --entity-name
Topic添加/修改动态配置
--add-config
sh bin/kafka-configs.sh --bootstrap-rver xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999
Topic删除动态配置
--delete-config
sh bin/kafka-configs.sh --bootstrap-rver xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms
其他配置同理,只需要类型改下--entity-type
类型有: (topics/clients/urs/brokers/broker- loggers)
哪些配置可以修改请看最后⾯的附件:ConfigCommand 的⼀些可选配置
3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions
请戳【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移 (如果点不出来,表⽰⽂章暂未发表,请耐⼼等待)
(3.1)脚本的使⽤介绍
关键参数--generate
1=》构造⽂件
  cd /data/kafka
  vim move.json
{
  "topics": [
  {"topic": "test"}
],
  "version": 1
}
运⾏ generate 参数⽣成当前副本的配置介绍 json,以及建议修改的 json
kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file ./move.json --broker-list "0,1,2"  --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[2,1,0],"log_dirs":["any Propod partition reassignment configuration
{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"test","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"test","partition":2,"replicas":[0,2,1],"log_dirs":["any 我找json 在线格式化看看
对⽐⼀下发现,partition 1 和 2 的replicas 不⼀样了;
4.Topic的发送kafka-console-producer.sh
4.1 ⽣产⽆key消息
## ⽣产者
bin/kafka-console-producer.sh --bootstrap-rver localhost:9092 --topic test --fig config/producer.properties
4.2 ⽣产有key消息加上属性--property par.key=true
## ⽣产者
bin/kafka-console-producer.sh --bootstrap-rver localhost:9092 --topic test --fig config/producer.properties  --property par.key=true
默认消息key与消息value间使⽤“Tab键”进⾏分隔,所以消息key以及value中切勿使⽤转义字符(\t)
可选参数
参数值类型说明有效值
--bootstrap-rver String要连接的服务器必需(除⾮指定--broker-list)如:host1:prot1,host2:prot2
--topic String(必需)接收消息的主题名称
--batch-size Integer单个批处理中发送的消息数200(默认值)
--compression-codec String压缩编解码器none、gzip(默认值)snappy、lz4、zstd
--max-block-ms Long在发送请求期间,⽣产者将阻⽌的最长时间60000(默认值)
--max-memory-bytes Long⽣产者⽤来缓冲等待发送到服务器的总内存33554432(默认值)
--max-partition-memory-
bytes
Long为分区分配的缓冲区⼤⼩16384
--message-nd-max-
retries
Integer最⼤的重试发送次数3
--metadata-expiry-ms Long强制更新元数据的时间阈值(ms)300000
--producer-property String将⾃定义属性传递给⽣成器的机制如:key=value
--fig String⽣产者配置属性⽂件[--producer-property]优先于此配置配置⽂件
完整路径
--property String⾃定义消息读取器par.key=true/fal key.parator= <key.parator&=true/fal
--request-required-acks String⽣产者请求的确认⽅式0、1(默认值)、all
--request-timeout-ms Integer⽣产者请求的确认超时时间1500(默认值)
--retry-backoff-ms Integer⽣产者重试前,刷新元数据的等待时间阈值100(默认值)
--socket-buffer-size Integer TCP接收缓冲⼤⼩102400(默认值)
--timeout Integer消息排队异步等待处理的时间阈值1000(默认值)
--sync同步发送消息
--version显⽰ Kafka 版
不配合其他参数时,显⽰为本地Kafka版本
--help打印帮助信息
5. Topic的消费kafka-console-consumer.sh
1. 新客户端从头消费--from-beginning (注意这⾥是新客户端,如果之前已经消费过了是不会从头消费的) 下⾯没有指定客户端名称,所以每次执⾏都是新客户端都会从头消费
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --from-beginning
2. 正则表达式匹配topic进⾏消费--whitelist消费所有的topic
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --whitelist '.*'
消费所有的topic,并且还从头消费
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --whitelist '.*' --from-beginning
3.显⽰key进⾏消费--property print.key=true
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --property print.key=true
4. 指定分区消费--partition指定起始偏移量消费--offt
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --partition 0 --offt
100
5. 给客户端命名--group
注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了
sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --group test-group
6. 添加客户端属性--consumer-property
这个参数也可以给客户端添加属性,但是注意不能多个地⽅配置同⼀个属性,他们是互斥的;⽐如在下⾯的基础上还加上属性--group test-group那肯定不⾏sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --consumer-property group.id=test-consumer-group
7. 添加客户端属性--fig
跟--consumer-property⼀样的性质,都是添加客户端的属性,不过这⾥是指定⼀个⽂件,把属性写在⽂件⾥⾯, --consumer-property的优先级⼤于--fig sh bin/kafka-console-consumer.sh --bootstrap-rver localhost:9092 --topic test --fig config/consumer.properties
参数描述例⼦
--group指定消费者所属组的ID
--topic被消费的topic
--partition指定分区;除⾮指定–offt,否则从分区结束(latest)开始消费--partition 0
--offt执⾏消费的起始offt位置 ;默认值: latest; /latest /earliest /偏移量--offt 10
--whitelist 正则表达式匹配topic;--topic就不⽤指定了; 匹配到的所有topic都会消费; 当然⽤了这个参数,--partition--offt等就不能使⽤了
--consumer-
property
将⽤户定义的属性以key=value的形式传递给使⽤者--consumer-property group.id=test-consumer-group --
消费者配置属性⽂件请注意,[consumer-property]优先于此配置--fig config/consumer.p
roperties
--property初始化消息格式化程序的属性print.timestamp=true,fal 、print.key=true,fal 、print.value=true,fal 、key.parator= <key.parator> 、line.parator=<line.parator>、key.derializer=<key.derializer>、value.derializer=<value.derializer>
--from-beginning 从存在的最早消息开始,⽽不是从最新消息开始,注意如果配置了客户端名称并且之前消费过,那就不会从头消费了
--max-
messages
消费的最⼤数据量,若不指定,则持续消费下去--max-messages 100 --skip-message-
on-error
如果处理消息时出错,请跳过它⽽不是暂停
--isolation-level 设置为read_committed以过滤掉未提交的事务性消息,设置为read_uncommitted以读取所有消息,默认值:read_uncommitted
--ls.DefaultMessageFormatter、ls.LoggingMessageFormatter、ls.NoOpMessageFormatter、ls.ChecksumMessageFormatter
6.kafka-leader-election Leader重新选举
6.1 指定Topic指定分区⽤重新PREFERRED:优先副本策略进⾏Leader重选举
> sh bin/kafka-leader-election.sh --bootstrap-rver xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0
6.2 所有Topic所有分区⽤重新PREFERRED:优先副本策略进⾏Leader重选举
sh bin/kafka-leader-election.sh --bootstrap-rver xxxx:9090 --election-type preferred  --all-topic-partitions
6.3 设置配置⽂件批量指定topic和分区进⾏Leader重选举
先配置leader-election.json⽂件
{
"partitions": [
{
"topic": "test_create_topic4",
"partition": 1
},
{
"topic": "test_create_topic4",
"partition": 2
}
]
}
sh bin/kafka-leader-election.sh --bootstrap-rver xxx:9090 --election-type preferred  --path-to-json-file config/leader-election.json
相关可选参数
参数描述例⼦
--bootstrap-rver指定kafka服务指定连接到的kafka服务--bootstrap-rver localhost:9092 --topic指定Topic,此参数跟--all-topic-partitions和path-to-json-file三者互斥
--partition指定分区,跟--topic搭配使⽤
--election-type两个选举策略(PREFERRED:优先副本选举,如果第⼀个副本不在线的话会失败;UNCLEAN: 策略)
--all-topic-partitions所有topic所有分区执⾏Leader重选举; 此参数跟--topic和path-to-json-file三者互斥
--path-to-json-file配置⽂件批量选举,此参数跟--topic和all-topic-partitions三者互斥
7. 持续批量推送消息kafka-verifiable-producer.sh
单次发送100条消息--max-messages 100
⼀共要推送多少条,默认为-1,-1表⽰⼀直推送到进程关闭位置
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-rver localhost:9092 --max-messages 100
每秒发送最⼤吞吐量不超过消息--throughput 100
推送消息时的吞吐量,单位messages/c。默认为-1,表⽰没有限制
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-rver localhost:9092 --throughput 100
发送的消息体带前缀--value-prefix
sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-rver localhost:9092 --value-prefix 666
注意--value-prefix 666必须是整数,发送的消息体的格式是加上⼀个点号.例如:666.
其他参数:--fig CONFIG_FILE指定producer的配置⽂件--acks ACKS每次推送消息的ack值,默认是-1
8. 持续批量拉取消息kafka-verifiable-consumer
持续消费
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-rver localhost:9092 --topic test_create_topic4
单次最⼤消费10条消息--max-messages 10
sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-rver localhost:9092 --topic test_create_topic4 --max-messages 10
相关可选参数
参数描述例⼦
--bootstrap-rver指定kafka服务指定连接到的kafka服务;--bootstrap-rver localhost:9092
-
-topic指定消费的topic
--group-id消费者id;不指定的话每次都是新的组id
group-instance-id消费组实例ID,唯⼀值
--max-messages单次最⼤消费的消息数量
--enable-autocommit是否开启offt⾃动提交;默认为fal
--ret-policy当以前没有消费记录时,选择要拉取offt的策略,可以是earliest, latest,none。默认是earliest
--assignment-strategy consumer分配分区策略,默认是org.apache.sumer.RangeAssignor
--fig指定consumer的配置⽂件
9.⽣产者压⼒测试kafka-producer-perf-test.sh
1. 发送1024条消息--num-records 100并且每条消息⼤⼩为1KB--record-size 1024最⼤吞吐量每秒100
00条--throughput 100
sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.rvers=localhost:9092 --record-size 1024
你可以通过LogIKM查看分区是否增加了对应的数据⼤⼩从LogIKM 可以看到发送了1024条消息; 并且总数据量=1M; 1024条*1024byte = 1M;
2. ⽤指定消息⽂件--payload-file发送100条消息最⼤吞吐量每秒100条--throughput 100
1. 先配置好消息⽂件
2. 然后执⾏命令发送的消息会从⾥⾯随机选择; 注意这⾥我们没有⽤参数--payload-delimeter指定分隔符,默认分隔符是\n换⾏;
bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.rvers=localhost:9090 --payload-file
3. 验证消息,可以通过 LogIKM 查看发送的消息
相关可选参数
参数描述例⼦
--topic指定消费的topic
--num-records发送多少条消息
--throughput每秒消息最⼤吞吐量
--producer-props ⽣产者配置, k1=v1,k2=v2
--producer-props bootstrap.rvers=
localhost:9092,client.id=test_client
--fig ⽣产者配置⽂件
--
-
-print-metrics在test结束的时候打印监控信息,默认fal--print-metrics true
--transactional-id 指定事务 ID,测试并发事务的性能时需要,只有在 --transaction-duration-ms > 0 时⽣效,默认值为 performance-producer-default-transactional-id
--transaction-duration-ms 指定事务持续的最长时间,超过这段时间后就会调⽤ commitTransaction 来提交事务,只有指定了 > 0 的值才会开启事务,默认值为 0
--record-size⼀条消息的⼤⼩byte; 和 --payload-file 两个中必须指定⼀个,但不能同时指定
--payload-file 指定消息的来源⽂件,只⽀持 UTF-8 编码的⽂本⽂件,⽂件的消息分隔符通过--payload-delimeter指定,默认是⽤换⾏\nl来分割的,和 --record-size 两个中必须指定⼀个,但不能同时指定 ; 如果提供的消息
--payload-delimeter 如果通过--payload-file指定了从⽂件中获取消息内容,那么这个参数的意义是指定⽂件的消息分隔符,默认值为 \n,即⽂件的每⼀⾏视为⼀条消息;如果未指定--payload-file则此参数不⽣效;发送消息的时候是随机送⽂件⾥⾯选择消息发送的;
10.消费者压⼒测试kafka-consumer-perf-test.sh
消费100条消息--messages 100
sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-rver localhost:9090 --messages 100相关可选参数
参数描述例⼦
--bootstrap-rver
--fig消费者配置⽂件
--date-format结果打印出来的时间格式化默认:yyyy-MM-dd HH:mm:ss:SSS
--fetch-size单次请求获取数据的⼤⼩默认1048576
--topic指定消费的topic
--from-latest
--group消费组ID
--hide-header如果设置了,则不打印header信息
--messages需要消费的数量
--num-fetch-threads feth 数据的线程数默认:1
--print-metrics结束的时候打印监控数据
--show-detailed-stats
--threads消费线程数;默认 10
11.删除指定分区的消息kafka-delete-records.sh
删除指定topic的某个分区的消息删除⾄offt为1024
先配置json⽂件offt-json-file.json
{"partitions":
[{"topic": "test1", "partition": 0,
"offt": 1024}],
"version":1
}
在执⾏命令
sh bin/kafka-delete-records.sh --bootstrap-rver 172.23.250.249:9090 --offt-json-file config/offt-json-file.json 验证通过 LogIKM 查看发送的消息

本文发布于:2023-05-12 14:17:17,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/105796.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   配置   指定   分区   消费   副本
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图