首页 > 作文

Springboot 整合 RocketMQ 收发消息

更新时间:2023-04-04 08:27:32 阅读: 评论:0

springboot 整合 rocketmq 收发消息

创建springb自嘲 鲁迅oot项目

pom.xml添加rocketmq-spring-boot-starter依赖。

<dependency>    <groupid>org.apache.rocketmq</groupid>    <artifactid>rocketmq-spring-boot-starter</artifactid>    <version>2.1.0</version></dependency>

yml 配置

application.yml

rocketmq:  name-rver: 192.168.64.141:9876

application-demo1.yml

使用 demo1 profile 指定生产者组组名

rocketmq:  producer:    group: producer-demo1

application-demo2.yml

使用 demo2 profile 指定生产者组组名

rocketmq:  produce托福口语复议r:    group: producer-demo2

测试

demo 1

发送普通消息发送 spring 的通用 message 对象发送异步消息发送顺序消息

生产者

package cn.tedu.demo2.m1;import org.apache.rocketmq.client.producer.ndcallback;import org.apache.rocketmq.client.producer.ndresult;import org.apache.rocketmq.spring.core.rocketmqtemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.messaging.message;import org.springframework.messaging.support.messagebuilder;import org.springframework.stereotype.component;@componentpublic class producer {    @autowired    private rocketmqtemplate t ;    public  void nd(){        //发送同步消息        t.convertandnd("topic1:taga", "hello world! ");        //发送spring的message        message<string> message = messagebuilder.withpayload("hello spring message! ").build();        t.nd("topic1:taga",message);        //发送异步消息        t.asyncnd("topic1:taga", "hello world asyn", new ndcallback() {            @override            public void onsuccess(ndresult ndresult) {                system.out.println("发送成功");            }            @override            public void onexception(throwable throwable) {                system.out.println("发送失败");            }        });        //发送顺序消息        t.syncndorderly("topic1", "98456237,创建", "98456237");        t.syncndorderly("topic1", "98456237,支付", "98456237");        t.syncndorderly("topic1", "98456237,完成", "98456237");    }}

消费者

package cn.tedu.demo2.m1;import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;import org.apache.rocketmq.spring.core.rocketmqlistener;import org.springframework.stereotype.component;@component@rocketmqmessagelistener(topic = "topic1",consumergroup = "consumer-demo1")public class consumer  implements rocketmqlistener<string> {    @override    public void onmessage(string s) {        system中学语文论文.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m1;import org.springframework.boot.springapplication;import org.springframework.boot.autoconfigure.springbootapplication;@springbootapplicationpublic class main {    public static void main(string[] args) {        springapplication.run(main.class, args);    }}

测试类

需要放在 test 文件夹

激活 demo1 profile @activeprofiles("demo1")

package cn.tedu.demo2.m1;import org.junit.jupiter.api.test;import org.springframework.beans.factory.annotatio中国气候类型分布图n.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.test.context.activeprofiles;@springboottest@activeprofiles("demo1")public class test1 {    @autowired    private  producer producer;    @test    public void test1(){        producer.nd();        try {            thread.sleep(5000);        } catch (interruptedexception e) {            e.printstacktrace();        }    }}

demo 2

发送事务消息

生产者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.rocketmqtransactionlistener;import org.apache.rocketmq.spring.core.rocketmqlocaltransactionlistener;import org.apache.rocketmq.spring.core.rocketmqlocaltransactionstate;import org.apache.rocketmq.spring.core.rocketmqtemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.messaging.message;import org.springframework.messaging.support.messagebuilder;import org.springframework.stereotype.component;@componentpublic class producer {    @autowired    private rocketmqtemplate t;    public void nd(){        message<string> message = messagebuilder.withpayload("hello world").build();        //一旦发送消息,则执行监听器        t.ndmessageintransaction("topic2",message,null);    } 歌唱祖国的歌曲   @rocketmqtransactionlistener    class lis implements rocketmqlocaltransactionlistener {        @override        public rocketmqlocaltransactionstate executelocaltransaction(message message, object o) {            system.out.println("执行本地事务");            return rocketmqlocaltransactionstate.unknown;        }        @override        public rocketmqlocaltransactionstate checklocaltransaction(message message) {            system.out.println("执行事务回查");            return rocketmqlocaltransactionstate.commit;        }    }}

消费者

package cn.tedu.demo2.m2;import org.apache.rocketmq.spring.annotation.rocketmqmessagelistener;import org.apache.rocketmq.spring.core.rocketmqlistener;import org.springframework.stereotype.component;@component@rocketmqmessagelistener(topic = "topic2",consumergroup = "consumer-demo2")public class consumer implements rocketmqlistener<string> {    @override    public void onmessage(string s) {        system.out.println("收到"+s);    }}

主类

package cn.tedu.demo2.m2;import org.springframework.boot.springapplication;import org.springframework.boot.autoconfigure.springbootapplication;@springbootapplicationpublic class main {    public static void main(string[] args) {        springapplication.run(main.class, args);    }}

测试类

package cn.tedu.demo2.m2;import org.junit.jupiter.api.test;import org.springframework.beans.factory.annotation.autowired;import org.springframework.boot.test.context.springboottest;import org.springframework.test.context.activeprofiles;@springboottest@activeprofiles("demo2")public class test2 {    @autowired    private  producer producer;    @test    public void  test1(){        producer.nd();        //为了能够收到消费者消费的数据,这里通过休眠模拟等待时间        try {            thread.sleep(30000);        } catch (interruptedexception e) {            e.printstacktrace();        }    }}

到此这篇关于springboot 整合 rocketmq 收发消息的文章就介绍到这了,更多相关springboot 整合 rocketmq 收发消息内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-04 08:27:31,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/547dbc978895dbf86076f12976cadf27.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:Springboot 整合 RocketMQ 收发消息.doc

本文 PDF 下载地址:Springboot 整合 RocketMQ 收发消息.pdf

标签:消息   生产者   收发   消费者
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图