RabbitMQ指南之三:发布订阅模式(PublishSubscribe)
在上⼀章中,我们创建了⼀个⼯作队列,⼯作队列模式的设想是每⼀条消息只会被转发给⼀个消费者。本章将会讲解完全不⼀样的场景:我们会把⼀个消息转发给多个消费者,这种模式称之为发布-订阅模式。
为了阐述这个模式,我们将会搭建⼀个简单的⽇志系统,它包含两种程序:⼀种发送⽇志消息,另⼀种接收并打印⽇志消息。在这个⽇志系统⾥,每⼀个运⾏的消费者都可以获取到消息,在这种情况下,我们可以实现这种需求:⼀个消费者接收消息并写⼊磁盘,另⼀个消费者接收消息并打印在电脑屏幕上。简单来说,⽣产者发布的消息将会以⼴播的形式转发到所有的消费者。
1、交换器(Exchange)
在前两章节我们,我们往队列中发布消息或获取消息,然⽽,前⾯的讲解其实并不完整,接下来,是时候介绍完整的RabbitMq消息模型了。
回忆⼀下我们前两章指南中包含的内容:
⼀个⽣产者⽤以发送消息;
⼀个队列缓存消息;
⼀个消费者⽤以消费队列中的消息。
RabbitMq消息模式的核⼼思想是:⼀个⽣产者并不会直接往⼀个队列中发送消息,事实上,⽣产者根本不知道它发送的消息将被转发到哪些队列。
实际上,⽣产者只能把消息发送给⼀个exchange,exchange只做⼀件简单的事情:⼀⽅⾯它们接收从⽣产者发送过来的消息,另⼀⽅⾯,它们把接收到的消息推送给队列。⼀个exchage必须清楚地知道如何处理⼀条消息。
propo是什么意思 有四种类型的交换器,分别是:direct、topic、headers、fanout。本章主要讲解最后⼀种:fanous(⼴播模式)。下⾯创建⼀个fanout 类型的交换器,我们称之为:logs:
hangeDeclare("logs", "fanout");
⼴播模式交换器很简单,从字⾯意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。在我们的⽇志系统中正好需要这种模式。
如果想查看当前系统中有多少个exchange,可以使⽤以下命令:
sudo rabbitmqctl list_exchanges
或者通过控制台查看:
可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。
在前⾯两章的指南中,我们并不知道交换器的存在,但是依然可以将消息发送到队列中,那其实并不是因为我们可以不使⽤交换器,实际上是我们使⽤了默认的交换器(我们通过指定交换器为字字符串:""),回顾⼀下我们之前是如何发送消息的:
1 channel.basicPublish("", "hello", null, Bytes());
第⼀个参数是交换器的名字,空字符串表⽰它是⼀个默认或⽆命名的交换器,消息将会由指定的路由键(第⼆个参数,routingKey,后⾯会讲)转发到队列。
你可能会有疑问:既然exchange可以指定为空字符串(""),那么可否指定为null?
答案是:不能!
通过跟踪发布消息的代码,在AMQImpl类中的Publish()⽅⾯中,可以看到,不光是exchange不能为null,同时routingKey路由键也不能为null,否则会抛出异常:
接着上⾯的讲解,我们创建⼀个命名的交换器:
version什么意思1 channel.basicPublish( "logs", "", null, Bytes());
2、临时队列
在前两章的例⼦中,我们使⽤的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同⼀名字的队列。因此,要想在⽣产者和消费者中间共享队列就必须要使⽤命名队列。
但是,本章讲解的⽇志系统也可以使⽤⾮命名队列(可以不⼿动命名),我们希望收到所有⽇志消息,⽽不是部分。并且我们希望总是接收到新的⽇志消息⽽不是旧的⽇志消息。为了解决这个问题,需要分两步⾛。
⾸先,⽆论何时我们的消费者连接到RabbitMq,我们都需要⼀个新的、空的队列来接收⽇志消息,因此,消费者在连接上RabbitMq之后需要创建⼀个任意名字的队列,或者让RabbitMq⽣成任意的队列名字。
其次,⼀旦该消费者断开了与RabbitMq的连接,队列也被⾃动删除。
通过JAVA客户端的⽆参⽅法:queueDeclare()来创建⼀个⾮持久化、专有的、⾃动删除的、名字随机⽣成的队列。
1 String queueName = channel.queueDeclare().getQueue();
3、绑定(Binding)
前⾯⼴播模式的交换器和队列已经创建好了,接下来就是要告诉交换器向队列⾥发送消息。交换器与队列之间的关系称之为绑定关系。
1 channel.queueBind(queueName, "logs", "");
locked out of heaven ⾄此,交换器已经可以往队列中发送消息了。
可以通过下列命令来查看队列的绑定关系:
4、完整的代码
EmitLog.java
1import com.rabbitmq.client.BuiltinExchangeType;
2import com.rabbitmq.client.Channel;
3import com.rabbitmq.client.Connection;
4import com.rabbitmq.client.ConnectionFactory;
5
6public class EmitLog {
7
8private static final String EXCHANGE_NAME = "logs";
9
10public static void main(String[] args) throws Exception {
11
12 ConnectionFactory factory = new ConnectionFactory();
13 factory.tHost("192.168.92.130");
14
15try (Connection connection = wConnection();
16 Channel channel = ateChannel();) {
17
18 hangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
19
20 String message = "RabbitMq fanout。。。。。。";
21 channel.basicPublish(EXCHANGE_NAME,"",Bytes("utf-8"));
22
23 System.out.println(" [x] Sent '" + message + "'");
24 }
25 }
26 }
正好你所看到的,Connection创建完成之后,定义了exchange,这⼀步是必要的,因为如果没有交换器将⽆法发送消息。
性别 英文
如此没有队列绑定到该交换器上,那么,交换器收到的消息将会丢失,但是对我们本章的⽇志系统来说没问题的,当没有消费者时,我们可以安全地放弃掉数据,我们只接收最新的⽇志消息。
ReceiveLogs.java
1public class ReceiveLogs {
2
3private static final String EXCHANGE_NAME = "logs";
4
5public static void main(String[] args) throws Exception {
6
7 ConnectionFactory factory = new ConnectionFactory();
8 factory.tHost("192.168.92.130");
9
10 Connection connection = wConnection();
11 Channel channel = ateChannel();
12
13 hangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
14
15final String queue = channel.queueDeclare().getQueue();
16 channel.queueBind(queue,EXCHANGE_NAME,"");
17
18 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
19不给力英文
20 DeliverCallback deliverCallback = (consumerTa,delivery) -> {
21
22 String message = new Body(), "UTF-8");
23 System.out.println(" [x] Received '" + message + "'");
24
25 };
26
27 channel.basicConsume(queue,true,deliverCallback,consumerTag -> {});
28 }
chayanne29 }
这⾥的autoAck设置为true,因为我们这⾥是⼴播模式,每个消费者都会收到⼀样的消息,并且这⾥给消费者⽣产的随机名称的队列相当于是独有的,所以在接收到消息之后⽴即发送确认回执是OK的。
但是这⾥先提出⼀个疑问:在这种模式下,每个队列收到的消息是否也会有Ready和Unacked状态?
5、测试结果
⼀、⾸先启动⽣产者,再启动两个消费者
可以看到,⽣产者启动后发送的消息丢失了,两个消费者并没有消费到,此时再看控制台:
可见RabbitMq为我们创建了两个随机命名的队列,其Exclusive是Owner,表⽰是专有的,Parameters为AD(auto delete),拥有该队列的消费者⼀占断开连接,队列将会被⾃动删除。
⼆、其次启动⽣产者发送⼀次消息
两个消费都都收到了消息。
三、关闭所有消费者,观察控制台变化
两个专有随机队列⾃动删除了。
6、SpringBoot的实现
⼯程结构图:
lockon⼀、配置⽂件application.properties:
⽣产者:
#RabbitMq
spring.rabbitmq.host=192.168.92.130
hange=logs
消费者:
#RabbitMq
spring.rabbitmq.host=192.168.92.130
hange=logs
##队列--我们可以⾃⼰指定队列名称,也可以由RabbitMq⾃动⽣成,这⾥为了⽅便,我们⾃⼰命名(如果需要,我也可以写⼀个⾃动⽣成名称的⽅法)rqbbitmq.log.fanout.info=info
rqbbitmq.=error
rver.port=8090
⼆、⽣产者代码
这⾥为了让系统⽣产者启动时就⾃动发送⼀条消息,我加了⼀个EmitLogRunner类。
EmitLog.java
1import org.AmqpTemplate;
2import org.springframework.beans.factory.annotation.Autowired;
3import org.springframework.beans.factory.annotation.Value;
4import org.springframework.stereotype.Component;
5
nancy grace6 @Component
7public class EmitLog {www darktube com
8
9 @Value("${hange}")
10private String exchange;
11
12 @Autowired
13private AmqpTemplate amqpTemplate;
14
15public void nd(String msg) {
16 vertAndSend(exchange,"",msg);云南培训网
17 }
18 }
EmitLogRunner.java
1import org.springframework.beans.factory.annotation.Autowired;
2import org.springframework.boot.ApplicationArguments;
3import org.springframework.boot.ApplicationRunner;
4import org.springframework.stereotype.Component;