RabbitMQ四种类型发送接收数据⽅式
1.基本⽤法
⽣产者
1 import pika
2 import sys
3
4 urname = 'wt' #指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 chan = s_conn.channel() #在连接上创建⼀个频道
9
10 chan.queue_declare(queue='hello') #声明⼀个队列,⽣产者和消费者都要声明⼀个相同的队列,⽤来防⽌万⼀某⼀⽅挂了,另⼀⽅能正常运⾏
11 chan.basic_publish(exchange='', #交换机
12 routing_key='hello',#路由键,写明将消息发往哪个队列,本例是将消息发往队列hello
13 body='hello world')#⽣产者要发送的消息
14 print("[⽣产者] nd 'hello world")
15
16 s_conn.clo()#当⽣产者发送完消息后,可选择关闭连接
17
18
19 输出:
20 [⽣产者] nd 'hello world
消费者
import pika
urname = 'wt'#指定远程rabbitmq的⽤户名密码
pwd = '111111'
ur_pwd = pika.PlainCredentials(urname, pwd)
s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
chan = s_conn.channel()#在连接上创建⼀个频道
chan.queue_declare(queue='hello')#声明⼀个队列,⽣产者和消费者都要声明⼀个相同的队列,⽤来防⽌万⼀某⼀⽅挂了,另⼀⽅能正常运⾏
def callback(ch,method,properties,body): #定义⼀个回调函数,⽤来接收⽣产者发送的消息
print("[消费者] recv %s" % body)
chan.basic_consume(callback, #调⽤回调函数,从队列⾥取消息
queue='hello',#指定取消息的队列名
no_ack=True) #取完⼀条消息后,不给⽣产者发送确认消息,默认是Fal的,即默认给rabbitmq发送⼀个收到消息的确认,⼀般默认即可print('[消费者] waiting for msg .')
chan.start_consuming()#开始循环取消息
输出:
[消费者] waiting for msg .
[消费者] recv b'hello world'
2. 实现功能:(1)rabbitmq循环调度,将消息循环发送给不同的消费者,如:消息1,3,5发送给消费者1;消息2,4,6发送给消费者2。
(2)消息确认机制,为了确保⼀个消息不会丢失,RabbitMQ⽀持消息的确认 , ⼀个 ack(acknowlege
ment) 是从消费者端
发送⼀个确认去告诉RabbitMQ 消息已经接收了、处理了,RabbitMQ可以释放并删除掉了。如果⼀个消费者死掉了(channel关闭、connection关闭、或者TCP连接断开了)⽽没有发送ack,RabbitMQ 就会认为这个消息没有被消费者处理,并会重新发送到⽣产者的队
列⾥,如果同时有另外⼀个消费者在线,rabbitmq将会将消息很快转发到另外⼀个消费者中。 那样的话你就能确保虽然⼀个消费者死掉,
但消息不会丢失。
这个是没有超时的,当消费⽅(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。
消息的 acknowlegments 默认是打开的,在前⾯的例⼦中关闭了: no_ack = True . 现在删除这个标识 然后 发送⼀个
acknowledgment。
(3)消息持久化,将消息写⼊硬盘中。 RabbitMQ不允许你重新定义⼀个已经存在、但属性不同的queue。需要标记消息为持久化的 - 要通过设置 delivery_mode 属性为 2来实现。
学生万能检讨书
消息持久化的注意点:
标记消息为持久化并不能完全保证消息不会丢失,尽管已经告诉RabbitMQ将消息保存到磁盘,但RabbitMQ接收到的消
息在还没有保存的时候,仍然有⼀个短暂的时间窗⼝。RabbitMQ不会对每个消息都执⾏同步 --- 可能只是保存到缓存cache还没有写⼊到
磁盘中。因此这个持久化保证并不是很强,但这⽐我们简单的任务queue要好很多,如果想要很强的持久化保证,可以使⽤ publisher confirms。
(4)公平调度。在⼀个消费者未处理完⼀个消息之前不要分发新的消息给它,⽽是将这个新消息分发给另⼀个不是很忙的消
费者进⾏处理。为了解决这个问题我们可以在消费者代码中使⽤ channel.basic.qos ( prefetch_count = 1 ),将消费者设置为公平调度。
⽣产者
1 import pika
2 import sys
3
4 urname = 'wt' #指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 channel = s_conn.channel() #在连接上创建⼀个频道
9
10 channel.queue_declare(queue='task_queue', durable=True) #创建⼀个新队列task_queue,设置队列持久化,注意不要跟已存在的队列重名,否则有报错
蜡炬成灰
11
12 message = "Hello World"
13 channel.basic_publish(exchange='',
14 routing_key='worker',#写明将消息发送给队列worker
15 body=message, #要发送的消息
16 properties=pika.BasicProperties(delivery_mode=2,)#设置消息持久化,将要发送的消息的属性标记为2,表⽰该消息要持久化
17 )
18 print(" [⽣产者] Send %r " % message)
消费者
解角
3
4 urname = 'wt'#指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
新加坡南洋大学6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 channel = s_conn.channel()#在连接上创建⼀个频道
9
10 channel.queue_declare(queue='task_queue', durable=True) #创建⼀个新队列task_queue,设置队列持久化,注意不要跟已存在的队列重名,否则有报错
11
12
13 def callback(ch, method, properties, body):
14 print(" [消费者] Received %r" % body)
15 time.sleep(1)
冰糖蒸梨16 print(" [消费者] Done")
17 ch.basic_ack(delivery_tag=method.delivery_tag)# 接收到消息后会给rabbitmq发送⼀个确认
18
19 channel.basic_qos(prefetch_count=1) # 消费者给rabbitmq发送⼀个信息:在消费者处理完消息之前不要再给消费者发送消息
20
21 channel.basic_consume(callback,
22 queue='worker',
23 #这⾥就不⽤再写no_ack=Fal了
24 )
25 channel.start_consuming()
3.交换机
exchange:交换机。⽣产者不是将消息发送给队列,⽽是将消息发送给交换机,由交换机决定将消息发送给哪个队列。所以exchange必
须准确知道消息是要送到哪个队列,还是要被丢弃。因此要在exchange中给exchange定义规则,所有的规则都是在exchange的类型中定义的。
exchange有4个类型:direct, topic, headers ,fanout
之前,我们并没有讲过exchange,但是我们仍然可以将消息发送到队列中。这是因为我们⽤的是默认exchange.也就是说之前写的:exchange='',空字符串表⽰默认的exchange。
之前的代码结构:
1 channel.basic_publish(exchange='',
2 routing_key='hello',
3 body=message)
exchange = '参数'
参数表⽰exchange 的名字,空字符串是默认或者没有exchange。消息被路由到某队列的根据是:routing_key.。如果routing_key的值
存在的话。
现在,我们可以⽤我们⾃⼰命名的exchange来代替默认的exchange。
1 channel.basic_publish(exchange='logs',#⾃⼰命名exchange为logs
2 routing_key='',
3 body=message)
(1)fanout:⼴播类型,⽣产者将消息发送给所有消费者,如果某个消费者没有收到当前消息,就再也收不到了(消费者就像收⾳机)
⽣产者:(可以⽤作⽇志收集系统)
3 urname = 'wt' #指定远程rabbitmq的⽤户名密码
4 pwd = '111111'
5 ur_pwd = pika.PlainCredentials(urname, pwd)
6 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
7 channel = s_conn.channel() #在连接上创建⼀个频道
8 hange_declare(exchange='logs',
9 type='fanout')#创建⼀个fanout(⼴播)类型的交换机exchange,名字为logs。
10
11 message = "info: Hello World!"
12 channel.basic_publish(exchange='logs',#指定交换机exchange为logs,这⾥只需要指定将消息发给交换机logs就可以了,不需要指定队列,因为⽣产者消息是发送
13 routing_key='',#在fanout类型中,绑定关键字routing_key必须忽略,写空即可
14 body=message)
毛姆传
15 print(" [x] Sent %r" % message)
16 connection.clo()西南交大分数线
消费者:
1 import pika
2 import sys
3
4 urname = 'wt' #指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 channel = s_conn.channel() #在连接上创建⼀个频道
9
10 hange_declare(exchange='logs',
11 type='fanout')#消费者需再次声明⼀个exchange 以及类型。
12
13 result = channel.queue_declare(exclusive=True)#创建⼀个队列,exclusive=True(唯⼀性)表⽰在消费者与rabbitmq断开连接时,该队列会⾃动删除掉。
14 queue_name = hod.queue#因为rabbitmq要求新队列名必须是与现存队列名不同,所以为保证队列的名字是唯⼀的,method.queue⽅法会随机创建⼀个
15
16 channel.queue_bind(exchange='logs',
17 queue=queue_name)#将交换机logs与接收消息的队列绑定。表⽰⽣产者将消息发给交换机logs,logs将消息发给随机队列queue,消费者在随机队列
18
19 print(' [消费者] Waiting for logs. To exit press CTRL+C')
20
21 def callback(ch, method, properties, body):
22 print(" [消费者] %r" % body)
23
24 channel.basic_consume(callback,#调⽤回调函数从queue中取消息
25 queue=queue_name,
26 no_ack=True)#设置为消费者不给rabbitmq回复确认。
27
28 channel.start_consuming()#循环等待接收消息。
这样,开启多个消费者后,会同时从⽣产者接收相同的消息。
(2)direct:关键字类型。功能:交换机根据⽣产者消息中含有的不同的关键字将消息发送给不同的队列,消费者根据不同的关键字从不
同的队列取消息
⽣产者:不⽤创建对列
3
4 urname = 'wt' #指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 channel = s_conn.channel() #在连接上创建⼀个频道
9
hange_declare(exchange='direct_logs',
11 type='direct')#创建⼀个交换机并声明exchange的类型为:关键字类型,表⽰该交换机会根据消息中不同的关键字将消息发送给不同的队列
12
13 verity = 'info'#verity这⾥只能为⼀个字符串,这⾥为‘info’表明本⽣产者只将下⾯的message发送到info队列中,消费者也只能从info队列中接收info消息
14 message = 'Hello World!'
15 channel.basic_publish(exchange='direct_logs',#指明⽤于发布消息的交换机、关键字
16 routing_key=verity,#绑定关键字,即将message与关键字info绑定,明确将消息发送到哪个关键字的队列中。
17 body=message)
18 print(" [⽣产者] Sent %r:%r" % (verity, message))
19 connection.clo()
消费者:
1 import pika
2 import sys
3
4 urname = 'wt' #指定远程rabbitmq的⽤户名密码
5 pwd = '111111'
6 ur_pwd = pika.PlainCredentials(urname, pwd)
7 s_conn = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.240', credentials=ur_pwd))#创建连接
8 channel = s_conn.channel() #在连接上创建⼀个频道
9
hange_declare(exchange='direct_logs',
11 type='direct')#创建交换机,命名为‘direct_logs’并声明exchange类型为关键字类型。
12
13 result = channel.queue_declare(exclusive=True)#创建随机队列,当消费者与rabbitmq断开连接时,这个队列将⾃动删除。
14 queue_name = hod.queue#分配随机队列的名字。
15
16 verities = ['info','err']#可以接收绑定关键字info或err的消息,列表中也可以只有⼀个
17 if not verities:#判断如果输⼊有误,输出⽤法
18 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
19 it(1)
20
21 for verity in verities:
22 channel.queue_bind(exchange='direct_logs',#将交换机、队列、关键字绑定在⼀起,使消费者只能根据关键字从不同队列中取消息
23 queue=queue_name,
24 routing_key=verity)#该消费者绑定的关键字。
25
26 print(' [消费者] Waiting for logs. To exit press CTRL+C')
27
28 def callback(ch, method, properties, body):#定义回调函数,接收消息
29 print(" [消费者] %r:%r" % (uting_key, body))
淋巴结核症状
30
31 channel.basic_consume(callback,
32 queue=queue_name,
33 no_ack=True)#消费者接收消息后,不给rabbimq回执确认。
34
35 channel.start_consuming()#循环等待消息接收