GOkafkasarama生产者消费者简单实现

更新时间:2023-05-12 14:38:25 阅读: 评论:0

GOkafkasarama⽣产者消费者简单实现
前提:开启 zookeeper 、 kafka
⽣产者代码:
步骤:1. ⽣成配置⽂件(⽣产者基础配置⽂件、指定⽣产者回复消息等级 0 1 all、指定⽣产者消息发送成功或者失败后的返回通道是什么、           指定发送到哪⼀个分区(本⽂为随机分区正常有三种: 通过partiton、通过key 去 Hash出⼀个分区、轮询))
   2. 构建消息(msg := &sarama.Message{} 这⾥为指针 1.消息可更改  2. 下⾯的发送消息SendMessage() 需要指针类型的参数)
    3. 连接kafka
    4. 发送消息
package main
import (
"fmt"
"/Shopify/sarama"
"log"
)
func main()  {
// 构建⽣产者
// ⽣成⽣产者配置⽂件
config := sarama.NewConfig()
// 设置⽣产者消息回复等级 0 1 all
config.Producer.RequiredAcks = sarama.WaitForAll
// 设置⽣产者成功发送消息将在什么通道返回
config.Producer.Return.Success = true
// 设置⽣产者发送的分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 构建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "aaa"
msg.Value = sarama.StringEncoder("123")
// 连接 kafka
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Print(err)
return
}
defer producer.Clo()
// 发送消息
message, offt, err := producer.SendMessage(msg)
if err != nil {
log.Println(err)
return
}
fmt.Println(message, " ", offt)
}
消费者代码:
步骤: 1. ⽣成消费者对象连接对应的地址 config可以为nil
    2. 拿到所有对应主题下的所有分区
    3. 遍历每⼀个分区调⽤消费者对象传⼊对应的主题哪⼀个具体的分区从什么位置开始读取⽂件 Return:消息对象
    4. 通过消息对象.Message() 可以取到对应的消息
package main
import (
"fmt"
"/Shopify/sarama"
"log"
"sync"
)
// 消费者练习
func main()  {
// ⽣成消费者实例
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Print(err)
return
}
// 拿到对应主题下所有分区
partitionList, err := consumer.Partitions("aaa")
if err != nil {
log.Println(err)
return
}
var wg sync.WaitGroup
wg.Add(1)
// 遍历所有分区
for partition := range partitionList{
//消费者消费对应主题的具体分区指定主题分区 offt  return 对应分区的对象
pc, err := consumer.ConsumePartition("aaa", int32(partition), sarama.OfftNewest)
if err != nil {
log.Println(err)
return
}
// 运⾏完毕记得关闭
defer pc.AsyncClo()
// 去出对应的消息
// 通过异步拿到消息
go func(sarama.PartitionConsumer) {
defer wg.Done()
for msg := range pc.Messages(){
fmt.Printf("Partition:%d Offt:%d Key:%v Value:%v", msg.Partition, msg.Offt, msg.Key, msg.Value)  }
}(pc)
}
wg.Wait()
}

本文发布于:2023-05-12 14:38:25,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/105818.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   分区   产者   对应   消费者   主题   发送   指定
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图