首页 > 作文

C# RabbitMQ的使用详解

更新时间:2023-04-04 08:15:28 阅读: 评论:0

本文目的如题。

安装

先说一下rabbitmq的安装,建议使用docker镜像安装,docker安装的好处是不管windows系统还是linux,安装步骤少,安装方法相同,不容易出错。使用下面的命令就可以:

docker run -d --hostname myrabbit --name rabbitmq3.9.11 -e rabbitmq_default_ur=admin -e rabbitmq_default_pass=admin rabbitmq_default_vhost=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management

安装完成后,可以打开浏览器访问管理网站http://127.0.0.1:15672,使用安装时设置的用户名和密码登录,就可以进行管理了。

不管使用什么方法安装,都可以运行本文中的示例。这些示例中使用了用户admin,密码是admin,如果没有,可以在管理网站中创建:

本文的示例中还使用了my_vhost虚拟主机,如果没有,也需要定义一下:

注意,admin 需要有对my_vhost的操作权限。

编写消息接收端

安装完成后可以进行开发了。我们需要编写消息的生产者和消费者,如果哪一部分出了问题,或者rabbitmq服务器出了问题,都会影响工作的进展。因此我们分步进行,先编写消息接受部分,也就是所谓的消费者,与rabbitmq服务器联调,成功后再进行下一步。

先创建一个.net 6的控制台项目,可以使用visual studio创建。如果使用命令行,命令如下:

mkdir directreceivedemocd directreceivedemodotnet new console 

然后安装rabbitmq.client程序包:

dotnet add 毕业档案package rabbitmq.client

编写program.cs代码如下:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.queuedeclare(queue: "mymessage",                                  durable: fal,                                  exclusive: fal,                                  autodelete: fal,                                  arguments: null);    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>    {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "mymessage",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

执行dotnet run 运行代码,程序会一直等待输入,这时需要输入一些消息验证程序。现在登录管理网站http://127.0.0.1:15672/,使用安装时设置的用户名和密码,在connections分页中可以看到多了新的连接:

在channel分页中可以看到当前的chanel:

进入queues分页,点击列表中的mymessage

进入mymessage队列:

在publish message中写一些消息并发送。回到控制台接收程序,消息应该已经被接收了。

到这里,接收部分完成,退出这个程序,我们开始编写发送部分。

编写发送端

创建过程跟接收部分完全一样,只是项目名称为directnddemo,program.cs代码如下:

using rabbitmq.client;using system.text;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.queuedeclare(queue: "mymessage",                                     durable: fal,                                     exclusive: fal,                                     autodelete: fal,                                     arguments: null);    console.writeline("输入需要传输的消息,输入exit退出");    var message = console.readline();    while (message != "exit")    {        var body = encoding.utf8.getbytes(message);        channel.basicpublish(exchange: "",                             routingkey: "mymessage",                             basicproperties: null,                             body: body);        console.writeline(" 发送消息 {0}", message);        message = console.readline();    }}console.writeline("按回车退出");console.readline();

运行这个项目,输入一些消息,

还是回到管理页面,在mymessage队列页面,执行getmessage,可以获取发送的消息。

测试发送端和接收端

现在我们可以让发送和接收一起工作了,在两个终端分别启动发送和接收程序,看是否可以一起工作。

发送和接收可以一起工作了。

现在可以用这两个程序做一些测试,首先看一下一个发送端,两个接收端是什么情况:

我们发现,接收端会轮流接收消息。
两个发送端对一个接收端的情况如下:

跟想象的一样,接收端会处理所有消息。

fanout 模式

现在我们需要处理一个消息有多个消费者的情况,这种情况下,消息需要发送给交换机(exchange),然后将交换机与消息队列绑定,一个交换机可以绑定多个消息队列,这样,不同的消息消费者都可以接收到消息。 我们创建一个新的发送方fanoutnder,将消息发送给exchange:

using rabbitmq.client;using system.text;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare("example.exchange", exchangetype.fanout, true, fal, null);    console.writeline("输入需要传输的消息,输入exit退出");    var message = console.readline();    while (message != "exit")    {        var body = encoding.utf8.getbytes(message);        channel.basicpublish(exchange: "example.exchange",                             routingkey: "",                             basicproperties: null,                             body: body);        console.writeline(" 发送消息 {0}", message);        message = console.readline();    }}console.writeline("按回车退出");console.readline();

然后创建两个接收方,fanoutreceiver1和fanoutreceiver2,分别接收que1和que2队列的消息舸舰迷津,这两个队列都绑定到相同的交换机,代码如下:
fanoutreceiver1:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare(exchange: "example.exchange",type: "fanout", durable: true);    channel.queuedeclare(queue: "que1",                         durable: true,                         exclusive: fal,                         autodelete: fal,                         arguments: null);    channel.queuebind(queue: "que1", exchange: "example.exchange",routingkey: "");    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>    {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "que1",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

fanoutreceiver2:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare(exchange: "example.exchange",type: "fanout", durable: true);    channel.queuedeclare(queue: "que2",                         durable: true,                         exclusive: fal,                         autodelete: fal,                         arguments: null);    channel.queuebind(queue: "que2", exchange: "example.exchange",routingkey: "");    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>  雨水的习俗  {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "que2",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

同时启动这三个程序,运行结果如下:

发送的消息被同时接收。

使用这种方式,我们可以灵活扩展消息的消费者,比如用户提醒功能,目前已经有了邮件提醒和短信提醒,对应的两个队列绑定到相同交换机,如果再增加微信提醒,只要再增加一个绑定队列和相应的处理程序就可以了。

direct模式和routekey

在fanout模式下,我们将消息发送到订阅消息的所有队列中,如果我们希望选择性地向队列发送消息,可以使用direct模式,根据不同的routekey向不同村郭的队列发送消息。

我们建立三个控制台程序程序模拟一个发送方和两个接收方,项目的创建方法同上,代码如下:
发送:

using rabbitmq.client;using system.text;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare("directdemo.exchange", exchangetype.direct, true, fal, null);    console.writeline("输入需要传输的消息,输入exit退出");    var message = console.readline();    while (message != "exit")    {        console.writeline("输入routekey");        var routekey = console.readline();        var body = encoding.utf8.getbytes(message);        channel.basicpublish(exchange: "directdemo.exchange",                             routingkey: routekey,                             basicproperties: null,                             body: body);        console.writeline(" 发送消息 {0} routekey {1}", message,routekey);        message = console.readline();    }}console.writeline("按回车退出");console.readline();

接收1:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare(exchange: "directdemo.exchange",type: exchangetype.direct, durable: true);    channel.queuedeclare(queue: "log_que",                         durable: true,                         exclusive: fal,                         autodelete: fal,                         arguments: null);    channel.queuebind(queue: "log_que", exchange: "directdemo.exchange",routingkey: "log");    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>    {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "log_que",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

接收2:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare(exchange: "directdemo.exchange",type: exchangetype.direct, durable: true);    channel.queuedeclare(queue: "email_que",                         durable: true,                         exclusive: fal,                         autodelete: fal,                         arguments: null);    channel.queuebind(queue: "email_que", exchange: "directdemo.exchange",routingkey: "email");    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>    {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "email_que",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

上面的代码中,关键是队列绑定:

 channel.queuebind(queue: "email_que", exchange: "directdemo.exchange",routingkey: "email");

这句话将queue、exchange和routingkey绑定在一起。运行效果如下:

topic 模式

前面的direct模式中,routekey是固定的,topic模式引入了通配符,routekey可以是符合表达式的任何字符串。

通配符“*”,代表一个字符通配符“#”,代表0或多个字符

仔细研究上面的规则,会发现topic模式可以代替direct和fanout,如果routekey被设置为“#”,就是队列可以接收任何消息,这与fanout模式相同,如果routekey中没有通配符,则和使用direct模式的效果相同。

现在我们编写topic模式的发送和接收,代码如下:
topic模式发送:

using rabbitmq.client;using system.text;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare("topicdemo.exchange", exchangetype.topic, true, fal, null);    console.writeline("输入需要传输的消息,输入exit退出");    var message = console.readline();    while (message != "exit")    {        console.writeline("输入routekey");        var routekey = console.readline();        var body = encoding.utf8.getbytes(message);        channel.basicpublish(exchange: "topicdemo.exchange",                             routingkey: routekey,                             basicproperties: null,                             body: body);        console.writeline(" 发送消息 {0} routekey {1}", message, routekey);        message = console.readlin蓬莱水城e();    }}console.writeline("按回车退出");console.readline();

topic模式接收:

using rabbitmq.client;using system.text;using rabbitmq.client.events;var factory = new connectionfactory(){    hostname = "127.0.0.1",    urname = "admin",    password = "admin",    virtualhost = "my_vhost"};using (var connection = factory.createconnection())using (var channel = connection.createmodel()){    channel.exchangedeclare(exchange: "topicdemo.exchange",type: exchangetype.topic, durable: true);    channel.queuedeclare(queue: "topic_que",                         durable: true,                         exclusive: fal,                         autodelete: fal,                         arguments: null);    channel.queuebind(queue: "topic_que", exchange: "topicdemo.exchange",routingkey: "#.log");    var consumer = new eventingbasicconsumer(channel);    consumer.received += (model, ea) =>    {        var body = ea.body;        var message = encoding.utf8.getstring(body.toarray());        console.writeline("收到消息 {0}", message);    };    channel.basicconsume(queue: "topic_que",                         autoack: true,                         consumer: consumer);    console.writeline(" 按回车退出");    console.readline();}

我们设置的routekey是”#.log”,也就是匹配这个表达式的routekey的消息会被接收到:

到这里rabbitmq常用的几种模式都介绍了,最后说一点代码中的细节,在发送方和接收方代码中,有重复的queue或者exchange声明,比如:

channel.queuedeclare(queue: "mymessage",                                  durable: fal,                                  exclusive: fal,                                  autodelete: fal,                                  arguments: null);

这些代码让人感到有些困惑,似乎每次都需要声明,而实际上是只要存在相关的queue或者exchange,这些代码就不再起作用。之所以在发送方和接收方都包含这些代码,是因为不知道是否存在相关的queue或exchange,也不知道谁先启动,避免出错。如果在rabbitmq的web管理页面预先手工创建了相应的queue或者exchange,这些代码是可以去掉的。

本文代码可以从github下载:https://github.com/zhenl/zl.rabbitmq.demo

到此这篇关于c# rabbitmq的使用详解的文章就介绍到这了,更多相关c# rabbitmq使用内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-04 08:15:24,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/da95124ee74316f291d8ea489ba5ce28.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:C# RabbitMQ的使用详解.doc

本文 PDF 下载地址:C# RabbitMQ的使用详解.pdf

标签:消息   队列   代码   模式
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图