pom.xml添加rocketmq-spring-boot-starter
依赖。
<dependency> <groupid>org.apache.rocketmq</groupid> <artifactid>rocketmq-spring-boot-starter</artifactid> <version>2.1.0</version></dependency>
application.yml
rocketmq: name-rver: 192.168.64.141:9876
application-demo1.yml
使用 demo1 profile 指定生产者组组名
rocketmq: producer: group: producer-demo1
application-demo2.yml
使用 demo2 profile 指定生产者组组名
rocketmq: produce托福口语复议r: group: producer-demo2
生产者
package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.ndcallback;import org.apache.rocketmq.client.producer.ndresult;import org.apache.rocketmq.spring.core.rocketmqtemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.messaging.message;import org.springframework.messaging.support.messagebuilder;import org.springframework.stereotype.component;@componentpublic class producer { @autowired private rocketmqtemplate t ; public void nd(){ //发送同步消息 t.convertandnd("topic1:taga", "hello world! "); //发送spring的message message<string> message = messagebuilder.withpayload("hello spring message! ").build(); t.nd("topic1:taga",message); //发送异步消息 t.asyncnd("topic1:taga", "hello world asyn", new ndcallback() { @override public void onsuccess(ndresult ndresult) { system.out.println("发送成功"); } @override public void onexception(throwable throwable) { system.out.println("发送失败"); } }); //发送顺序消息 t.syncndorderly("topic1", "98456237,创建", "98456237"); t.syncndorderly("topic1", "98456237,支付", "98456237"); t.syncndorderly("topic1", "98456237,完成", "98456237"); }}
消费者
package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;import org.apache.rocketmq.spring.core.rocketmqlistener;import org.springframework.stereotype.component;@component@rocketmqmessagelistener(topic = "topic1",consumergroup = "consumer-demo1")public class consumer implements rocketmqlistener<string> { @override public void onmessage(string s) { system中学语文论文.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m1;import org.springframework.boot.springapplication;import org.springframework.boot.autoconfigure.springbootapplication;@springbootapplicationpublic class main { public static void main(string[] args) { springapplication.run(main.class, args); }}
测试类
需要放在 test 文件夹
激活 demo1 profile @activeprofiles("demo1")
package cn.tedu.demo2.m1;import org.junit.jupiter.api.test;import org.springframework.beans.factory.annotatio中国气候类型分布图n.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.test.context.activeprofiles;@springboottest@activeprofiles("demo1")public class test1 { @autowired private producer producer; @test public void test1(){ producer.nd(); try { thread.sleep(5000); } catch (interruptedexception e) { e.printstacktrace(); } }}
发送事务消息
生产者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.rocketmqtransactionlistener;import org.apache.rocketmq.spring.core.rocketmqlocaltransactionlistener;import org.apache.rocketmq.spring.core.rocketmqlocaltransactionstate;import org.apache.rocketmq.spring.core.rocketmqtemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.messaging.message;import org.springframework.messaging.support.messagebuilder;import org.springframework.stereotype.component;@componentpublic class producer { @autowired private rocketmqtemplate t; public void nd(){ message<string> message = messagebuilder.withpayload("hello world").build(); //一旦发送消息,则执行监听器 t.ndmessageintransaction("topic2",message,null); } 歌唱祖国的歌曲 @rocketmqtransactionlistener class lis implements rocketmqlocaltransactionlistener { @override public rocketmqlocaltransactionstate executelocaltransaction(message message, object o) { system.out.println("执行本地事务"); return rocketmqlocaltransactionstate.unknown; } @override public rocketmqlocaltransactionstate checklocaltransaction(message message) { system.out.println("执行事务回查"); return rocketmqlocaltransactionstate.commit; } }}
消费者
package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;import org.apache.rocketmq.spring.core.rocketmqlistener;import org.springframework.stereotype.component;@component@rocketmqmessagelistener(topic = "topic2",consumergroup = "consumer-demo2")public class consumer implements rocketmqlistener<string> { @override public void onmessage(string s) { system.out.println("收到"+s); }}
主类
package cn.tedu.demo2.m2;import org.springframework.boot.springapplication;import org.springframework.boot.autoconfigure.springbootapplication;@springbootapplicationpublic class main { public static void main(string[] args) { springapplication.run(main.class, args); }}
测试类
package cn.tedu.demo2.m2;import org.junit.jupiter.api.test;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.test.context.activeprofiles;@springboottest@activeprofiles("demo2")public class test2 { @autowired private producer producer; @test public void test1(){ producer.nd(); //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间 try { thread.sleep(30000); } catch (interruptedexception e) { e.printstacktrace(); } }}
到此这篇关于springboot 整合 rocketmq 收发消息的文章就介绍到这了,更多相关springboot 整合 rocketmq 收发消息内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-04 08:27:31,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/547dbc978895dbf86076f12976cadf27.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Springboot 整合 RocketMQ 收发消息.doc
本文 PDF 下载地址:Springboot 整合 RocketMQ 收发消息.pdf
留言与评论(共有 0 条评论) |