python脚本读写kafka数据(转载)
1、⽣产者:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_rvers=['172.21.10.136:9092'])#此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
晕车后怎么办msg ="msg%d"% i
producer.nd('test', msg)
producer.clo()
情感广告
2、消费者(简单demo):
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
bootstrap_rvers=['172.21.10.136:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s"%(pic, message.partition,
message.offt, message.key,
message.value))
启动后⽣产者、消费者可以正常消费。
3、消费者(消费群组)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
group_id='my-group',
bootstrap_rvers=['172.21.10.136:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s"%(pic, message.partition,
message.offt, message.key,
message.value))
启动多个消费者,只有其中可以可以消费到,满⾜要求,消费组可以横向扩展提⾼处理能⼒
4、消费者(读取⽬前最早可读的消息)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',
auto_offt_ret='earliest',
bootstrap_rvers=['172.21.10.136:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s"%(pic, message.partition,
message.offt, message.key,
message.value))
auto_offt_ret:重置偏移量,earliest移到最早的可⽤消息,latest最新的消息,默认为latest
源码定义:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
5、消费者(⼿动设置偏移量)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',
bootstrap_rvers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test")#获取test主题的分区信息
pics()#获取主题列表
print consumer.subscription()#获取当前消费者订阅的主题
print consumer.assignment()#获取当前消费者topic、分区信息
print consumer.beginning_offts(consumer.assignment())#获取当前消费者可消费的偏移量consumer.ek(TopicPartition(topic=u'test',partition=0),5)#重置偏移量,从第5个偏移量消费for message in consumer:
print("%s:%d:%d: key=%s value=%s"%(pic, message.partition,
message.offt, message.key,
message.value))
乐天都有哪些品牌
6、消费者(订阅多个主题)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
哪天是鬼节
consumer = KafkaConsumer(bootstrap_rvers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))#订阅要消费的主题
简单黑板报pics()
print consumer.position(TopicPartition(topic=u'test',partition=0))#获取当前主题的最新偏移量for message in consumer:
print("%s:%d:%d: key=%s value=%s"%(pic, message.partition,
message.offt, message.key,
message.value))
7、消费者(⼿动拉取消息)谢尔曼
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_rvers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
男宝胶囊while True:
msg = consumer.poll(timeout_ms=5)#从kafka获取消息
print msg
time.sleep(1)
8、消费者(消息挂起与恢复)
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_rvers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.pau(TopicPartition(topic=u'test',partition=0))
num =0
while True:
print num
print consumer.paud()#获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num +1
if num ==10:
保密协议模板
"
"
pau执⾏后,consumer不能读取,直到调⽤resume后恢复。