消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。
目前在生产环境中使用较多的消息队列有 activemq、rabbitmq、zeromq、kafka、metamq、rocketmq 等。
特性
异步性:将耗时的同步操作通过以发送消息的方式进行了异步化处理,减少了同步等待的时间。松耦合:消息队列减少了服务之间的耦合性,不同的服务可以通过消息队列进行通信,而不用关心彼此的实现细节,只要定义好消息的格式就行。分布式:通过对家风建设心得体会消费者的横向扩展,降低了消息队列阻塞的风险,以及单个消费者产生单点故障的可能性(当然消息队列本身也可以做成分布式集群)。可靠性:消息队列一般会把接收到的消息存储到本地硬盘上(当消息被处理完之后,存储信息根据不同的消息队列实现,有可能将其删除),这样即使应用挂掉或者消息队列本身挂掉,消息也能够重新加载。jms 规范
jms 即 java 消息服务(java message rvice)应用程序接口,是一个 java 平台中关于面向消息中间件(mom)的 api,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。java 消息服务是一个与具体平台无关的 api,绝大多数 mom 提供商都对 jms 提供支持。
jms 的消息机制有 2 种模型,一种是 point to point,表现为队列的形式,发送的消息,只能被一个接收者取走;另一种是 topic,可以被多个订阅者订阅,类似于群发。
activemq 是 jms 的一个实现。
activemq 是 apache 软件基金下的一个开源软件,它遵循 jms1.1 规范(java message rvice),是消息驱动中间件软件(mom)。它为企业消息传递提供高可用、出色性能、可扩展、稳定和安全保障。activemq 使用 apache 许可协议,因此,任何人都可以使用和修改它而不必反馈任何改变。
activemq 的目标是在尽可能多的平台和语言上提供一个标准的,消息驱动的应用集成。activemq 实现 jms 规范并在此之上提供大量额外的特性。activemq 支持队列和订阅两种模式的消息发送。
spring boot 提供了 activemq 组件 spring-boot-starter-activemq,用来支持 activemq 在 spring boot 体系内使用,下面我们来详细了解如何使用。
添加依赖
主要添加组件:spring-boot-starter-activemq。
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-activemq</artifactid></dependency>
配置文件
在 application.properties 中添加配置。
# 基于内存的 activemqspring.activemq.in-memory=true# 不适应连接池spring.activemq.pool.enabled=fal # 独立安装的 activemq#spring.activemq.broker-url=tcp://192.168.0.1:61616#spring.activemq.ur=admin#spring.activemq.password=admin
在使用 activemq 时有两种使用方式,一种是使用独立安装的 activemq,在生产环境推荐使用这种;另一种是使用基于内存 activemq ,在调试阶段建议使用这种方式。
队列发送的消息,只能被一个消费者接收。
创建队列
@configurationpublic class mqconfig { @bean public queue queue() { return new activemqqueue("neo.queue"); }}
使用 @configuration 注解在项目启动时,定义了一个队列 queue 命名为:neo.queue。
消息生产者
创建一个消息的生产者:
@componentpublic class producer{ @autowired private jmsmessagingtemplate jmsmessagingtemplate; @autowired private queue queue; public void ndqueue(string msg) { system.out.println("nd queue msg :"+msg); this.jmsmessagingtemplate.convertandnd(this.queue, msg); }}
jmsmessagingtemplate 是 spring 提供发送消息的工具类,使用 jmsmessagingtemplate 和创建好的 queue 对消息进行发送。
消息消费者
@componentpublic class consumer { @jmslistener(destination = "neo.queue") public void receivequeue(string text) { system.out.println("consumer queue msg : "+text); }}
使用注解 @jmslistener(destination = “neo.queue”),表示此方法监控西塘古镇了名为 neo.queue 的队列。当队列 neo.queue 中有消息发送时会触发此方法的执行,text 为消息内容。
测试
创建 sampleactivemqtests 测试类,注入创建好的消息生产者。
@runwith(springrunner.class)@springboottestpublic class sample中国物理研究院activemqtests { @autowired private producer producer; @rule public outputcapture outputcapture = new outputcapture();}
outputcapture 是 spring boot 提供的一个测试类,它能捕获 system.out 和 system.err 的输出,我们可以利用这个特性来判断程序中的输出是否执行。
@testpublic void ndsimplequeuemessage() throws interruptedexception { this.producer.ndqueue("test queue message"); thread.sleep(1000l); asrtthat(this.outputcapture.tostring().contains("test queue")).istrue();}
创建测试方式,使用 producer 发送消息,为了保证容器可以接收到消息,让测试方法等待 1 秒,最后使用 outputcapture 判断是否执行成功。
测试多消费者
上面的案例只是一个生产者一个消费者,我们在模拟一个生产者和多个消费者队列的执行情况。我们复制上面的消费者 consumer 重新命名为 consumer2,并且将输出内容加上 2 的关键字,如下:
@componentpublic class consumer2 { @jmslistener(destination = "neo.queue") public void receivequeue(string text) { system.out.printl努力加油n("consumer2 queue msg : "+text); }}
在刚才的测试类中添加一个 nd100queuemessage() 方法,模式发送 100 条消息时,两个消费者是如何单位承诺书消费消息的。
@testpublic void nd100queuemessage() throws interruptedexception { for (int i=0;i<100;i++){ this.producer.ndqueue("test queue message"+i); } thread.sleep(1000l);}
控制台输出结果:
consumer queue msg : test queue message0
consumer2 queue msg : test queue message1
consumer queue msg : test queue message2
consumer2 queue msg : test queue message3
…
根据控制台输出的消息可以看出,当有多个消费者监听一个队列时,消费者会自动均衡负载的接收消息,并且每个消息只能有一个消费者所接收。
注意:控制台输出 javax.jms.jmxception: peer (vm://localhost#1) stopped. 报错信息可以忽略,这是 info 级别的错误,是 activemq 的一个 bug。
广播发送的消息,可以被多个消费者接收。
创建 topic
@configurationpublic class mqconfig { @bean public topic topic() { return new activemqtopic("neo.topic"); }}
使用 @configuration 注解在项目启动时,定义了一个广播 topic 命名为:neo.topic。
消息生产者
创建一个消息的生产者:
@componentpublic class producer{ @autowired private jmsmessagingtemplate jmsmessagingtemplate; @autowired private topic topic; public void ndtopic(string msg) { system.out.println("nd topic msg :"+msg); this.jmsmessagingtemplate.convertandnd(this.topic, msg); }}
和上面的生产者对比只是 convertandnd() 方法传入的第一个参数变成了 topic。
消息消费者
@componentpublic class consumer { @jmslistener(destination = "neo.topic") public void receivetopic(string text) { system.out.println("consumer topic msg : "+text); }}
消费者也没有变化,只是监听的名改为上面的 neo.topic,因为模拟多个消费者,复制一份 consumer 命名为 consumer2,代码相同在输出中标明来自 consumer2。
测试
创建 sampleactivemqtests 测试类,注入创建好的消息生产者。
@testpublic void ndsimpletopicmessage() throws interruptedexception { this.producer.ndtopic("test topic message"); thread.sleep(1000l);}
测试方法执行成功后,会看到控制台输出信息,如下:
nd topic msg :test topic message
consumer topic msg : test topic message
consumer2 topic msg : test topic message
可以看出两个消费者都收到了发送的消息,从而验证广播(topic)是一个发送者多个消费者的模式。
spring boot 集成 activemq 的项目默认只支持队列或者广播中的一种,通过配置项 spring.jms.pub-sub-domain 的值来控制,true 为广播模式,fal 为队列模式,默认情况下支持队列模式。
如果需要在同一项目中既支持队列模式也支持广播模式,可以通过 defaultjmslistenercontainerfactory 创建自定义的 jmslistenercontainerfactory 实例,之后在 @jmslistener 注解中通过 containerfactory 属性引用它。
分别创建两个自定义的 jmslistenercontainerfactory 实例,通过 pubsubdomain 来控制是支持队列模式还是广播模式。
@configuration@enablejmspublic class activemqconfig { @bean("queuelistenerfactory") public jmslistenercontainerfactory<?> queuelistenerfactory(connectionfactory connectionfactory) { defaultjmslistenercontainerfactory factory = new defaultjmslistenercontainerfactory(); factory.tconnectionfactory(connectionfactory); factory.tpubsubdomain(fal); return factory; } @bean("topiclistenerfactory") public jmslistenercontainerfactory<?> topiclistenerfactory(connectionfactory connectionfactory) { defaultjmslistenercontainerfactory factory = new defaultjmslistenercontainerfactory(); factory.tconnectionfactory(connectionfactory); factory.tpubsubdomain(true); return factory; }}
然后在消费者接收的方法中,指明使用 containerfactory 接收消息。
@componentpublic class consumer { @jmslistener(destination = "neo.queue", containerfactory = "queuelistenerfactory") public void receivequeue(string text) { system.out.println("consumer queue msg : "+text); } @jmslistener(destination = "neo.topic", containerfactory = "topiclistenerfactory") public void receivetopic(string text) { system.out.println("consumer topic msg : "+text); }}
改造完成之后,再次执行队列和广播的测试方法,就会发现项目同时支持了两种类型的消息收发。
消息中间件广泛应用在大型互联网架构中,利用消息中间件队列和广播各自的特性可以支持很多业务,比如群发发送短信、给单个用户发送邮件等。activemq 是一款非常流行的消息中间件,它的特点是部署简单、使用方便,比较适合中小型团队。spring boot 提供了集成 activemq 对应的组件,在 spring boot 中使用 activemq 只需要添加相关注解即可。
到此这篇关于springboot学习笔记之操作activemq指南的文章就介绍到这了,更多相关springboot操作activemq指南内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-03 22:52:19,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/d05a6ef352ac2c36ddabb4965b24421c.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:spring boot学习笔记之操作ActiveMQ指南.doc
本文 PDF 下载地址:spring boot学习笔记之操作ActiveMQ指南.pdf
留言与评论(共有 0 条评论) |