RocketMQ⾃定义Binding
概述
在实际⽣产中,我们需要发布和订阅的消息可能不⽌⼀种Topic,故此时就需要使⽤⾃定义Binding来帮我们实现多Topic的发布和订阅功能
⽣产者
⾃定义Output接⼝,代码如下:
publicinterfaceMySource{
@Output("output1")
MessageChanneloutput1();
@Output("output2")
MessageChanneloutput2();
}
发布消息的案例代码如下:
@Autowired
privateMySourcesource;
publicvoidnd(Stringmsg)throwsException{
1().nd(yload(msg).build());
}
消费者
⾃定义Input接⼝,代码如下:
publicinterfaceMySink{
@Input("input1")
SubscribableChannelinput1();
@Input("input2")
SubscribableChannelinput2();
@Input("input3")
SubscribableChannelinput3();
@Input("input4")
SubscribableChannelinput4();
}
接收消息的案例代码如下:
@StreamListener("input1")
publicvoidreceiveInput1(StringreceiveMsg){
n("input1receive:"+receiveMsg);
}
Application
配置Input和Output的Binding信息并配合@EnableBinding注解使其⽣效,代码如下:
@SpringBootApplication
@EnableBinding({,})
publicclassRocketMQApplication{
publicstaticvoidmain(String[]args){
(,args);
}
}
⽣产者
spring:
application:
name:rocketmq-provider
cloud:
stream:
rocketmq:
binder:
namesrv-addr:192.168.10.149:9876
bindings:
output1:{destination:test-topic1,content-type:application/json}
output2:{destination:test-topic2,content-type:application/json}
消费者
spring:
application:
name:rocketmq-consumer
cloud:
stream:
rocketmq:
binder:
namesrv-addr:192.168.10.149:9876
bindings:
input:{y:true}
bindings:
input1:{destination:test-topic1,content-type:text/plain,group:test-group,empts:1}
input2:{destination:test-topic1,content-type:text/plain,group:test-group,empts:1}
input3:{destination:test-topic2,content-type:text/plain,group:test-group,empts:1}
input4:{destination:test-topic2,content-type:text/plain,group:test-group,empts:1
本文发布于:2022-12-31 22:00:55,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/68087.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |