RabbitMQ(⼀)publish消息确认
Using standard AMQP, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional, publish the message, commit. In this ca, transactions are unnecessarily heavyweight and decrea throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.
如果采⽤标准的 AMQP 协议,则唯⼀能够保证消息不会丢失的⽅式是利⽤事务机制 -- 令 channel 处于 transactional 模式、向其 publish 消息、执⾏ commit 动作。在这种⽅式下,事务机制会带来⼤量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引⼊了 confirmation 机制(即 Publisher Confirm)。
To enable confirms, a client nds the confirm.lect method. Depending on whether no-wait was t or not, the broker may respond with a confirm.lect-ok. Once the confirm.lect method is ud on a channel, it is said to be in confirm mode. A transactional channel cannot be put into confirm mode and once a channel is in confirm mode, it cannot be made transactional.
为了使能 confirm 机制,client ⾸先要发送 confirm.lect ⽅法帧。取决于是否设置了no-wait属性,broker 会相应的判定是否以 confirm.lect-ok 进⾏应答。⼀旦在 channel 上使⽤ confirm.lect⽅法,c
kebabhannel 就将处于confirm 模式。处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。
Once a channel is in confirm mode, both the broker and the client count messages (counting starts at 1 on the first confirm.lect). The broker then confirms messages as it handles them by nding a basic.ack on the same channel. The delivery-tag field contains the quence number of the confirmed message. The broker may also t the multiple field in basic.ack to indicate that all messages up to and including the one with the quence number have been handled.
⼀旦 channel 处于 confirm 模式,broker 和 client 都将启动消息计数(以 confirm.lect 为基础从 1 开始计数)。broker 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的⽅式对其进⾏ confirm 。delivery-tag 域的值标识了被 confirm 消息的序列号。broker 也可以通过设置 basic.ack 中的multiple 域来表明到指定序列号为⽌的所有消息都已被 broker 正确的处理了。
In exceptional cas when the broker is unable to handle messages successfully, instead of a basic.ack, the broker will nd a basic.nack. In this context, fields of the basic.nack have the same meaning as the corresponding ones in basic.ack and the requeue field should be ignored. By nack'ing one or more messages, the broker indicates that it was unable to process the messages and refus responsibility for them; at that point, the client may choo to re-publish the messages.
全国四六级报名官网
在异常情况中,broker 将⽆法成功处理相应的消息,此时 broker 将发送 basic.nack 来代替 basic.ack 。在这个情形下,basic.nack 中各域值的含义
与 basic.ack 中相应各域含义是相同的,同时requeue 域的值应该被忽略。通过 nack ⼀或多条消息,broker 表明⾃⾝⽆法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client 可以选择将消息 re-publish 。
After a channel is put into confirm mode, all subquently published messages will be confirmed or nack'd once. No guarantees are made as to how soon a message is confirmed. No message will be both confirmed and nack'd.
在 channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack)或者被 nack ⼀次。但是没有对消息被 confirm 的快慢做任何保证,并且同⼀条消息不会既被 confirm ⼜被 nack 。
An example in Java that publishes a large number of messages to a channel in confirm mode and waits for the acknowledgements can be found .
⼀个 Java ⽰例展现了 publish ⼤量消息到⼀个处于 confirm 模式的 channel 并等待获取 acknowledgement 的情况,⽰例在。
When will messages be confirmed?
消息会在何时被 confirm?
The broker will confirm messages once:
broker 将在下⾯的情况中对消息进⾏ confirm :
it decides a message will not be routed to queues
(if the mandatory flag is t then urn is nt first) or
broker 发现当前消息⽆法被路由到指定的 queues 中(如果设置了 mandatory 属性,则 broker 会先发送 urn)
a transient message has reached all its queues (and mirrors) or
⾮持久属性的消息到达了其所应该到达的所有 queue 中(和镜像 queue 中)
a persistent message has reached all its queues (and mirrors) and been persisted to disk (and fsynced) or
持久消息到达了其所应该到达的所有 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync)
a persistent message has been consumed (and if necessary acknowledged) from all its queues
持久消息从其所在的所有 queue 中被 consume 了(如果必要则会被 acknowledge)
Notes
The broker los persistent messages if it crashes before said messages are written to disk. Under certain conditions, this caus the broker to behave in surprising ways.
broker 会丢失持久化消息,如果 broker 在将上述消息写⼊磁盘前异常。在⼀定条件下,这种情况会导致 broker 以⼀种奇怪的⽅式运⾏。
For instance, consider this scenario:
例如,考虑下述情景:
1. a client publishes a persistent message to a durable queue
⼀个 client 将持久消息 publish 到持久 queue 中
2. a client consumes the message from the queue (noting that the message is persistent and the queue durable), but doesn't yet ack it,
另⼀个 client 从 queue 中 consume 消息(注意:该消息具有持久属性,并且 queue 是持久化的),当尚未对其进⾏ ack
21世纪大学英语读写教程第四册3. the broker dies and is restarted, and
broker 异常重启
4. the client reconnects and starts consuming messages.冷焰
client 重连并开始 consume 消息
会计员At this point, the client could reasonably assume that the message will be delivered again. This is not the ca: the restart has caud the broker to lo the message. In order to guarantee persistence, a client should u confirms. If the publisher's channel had been in confirm mode, the publisher would not have received an ack for the lost message (since the consumer hadn't ack'd it and it hadn't been written to disk).
在上述情景下,client 有理由认为消息需要被(broker)重新 deliver 。但这并⾮事实:重启(有可能)会令 broker 丢失消息。为了确保持久性,client 应该使⽤ confirm 机制。如果 publisher 使⽤的 channel 被设置为 confirm 模式,publisher 将不会收到已丢失消息的 ack(这是因为 consumer 没有对消息进⾏ ack ,同时该消息也未被写⼊磁盘)。
下⾯是⽤rabbitmq-c实现publish消息确认的代码:
/*
*
* gcc -o amqp_ndstring amqp_ndstring.c utils.c -I/usr/local/rabbitmq-c/include -L/usr/local/rabbitmq-c/lib -lrabbitmq
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdint.h>
石家庄少儿英语培训#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
int main(int argc, char const *const *argv)
{
char const *hostname;
int port, status;
char const *exchange;
char const *routingkey;
char const *routingkey;
char const *messagebody;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if (argc < 6) {
fprintf(stderr, "Usage: amqp_ndstring host port exchange routingkey messagebody\n");
return 1;
}
hostname = argv[1];
port = atoi(argv[2]);
exchange = argv[3];
routingkey = argv[4];
messagebody = argv[5];
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
status = amqp_socket_open(socket, hostname, port);
if (status) {
die("opening TCP socket");雷霆万钧的意思
}
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
amqp_channel_open(conn, 1);
die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");
{
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
props.delivery_mode = 2; /* persistent delivery mode */
amqp_confirm_lect(conn, 1); //在通道上打开Publish确认
die_on_error(amqp_basic_publish(conn,
1,
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(routingkey),
0, //mandatory标志位,消息不能到达队列则返回urn
0, //immediate标志位,消息不能到达消费者返回urn
&props,
amqp_cstring_bytes(messagebody)),
"Publishing");
}
{
/* Publish消息后需要在当前通道上监听返回的信息,来判断消息是否成功投递
* 这⾥要息根据投递消息的⽅式来过滤判断⼏个⽅法
*/
amqp_frame_t frame;
amqp_rpc_reply_t ret;
if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
return;
}
if (AMQP_FRAME_METHOD == frame.frame_type) {
amqp_method_t method = hod;
南宁培训
fprintf(stdout, "method.id=%08X,method.name=%s\n",
method.id, amqp_method_name(method.id));
method.id, amqp_method_name(method.id));
switch (method.id) {
thereof
ca AMQP_BASIC_ACK_METHOD:{
/* if we've turned publisher confirms on, and we've published a message
* here is a message being confirmed
*/
{
amqp_basic_ack_t *s;
s = (amqp_basic_ack_t *) method.decoded;
fprintf(stdout, "Ack.delivery_tag=%d\n", s->delivery_tag);
fprintf(stdout, "Ack.multiple=%d\n", s->multiple);
}
break;
ca AMQP_BASIC_NACK_METHOD:
/* if we've turned publisher confirms on, and we've published a message
* here is a message not being confirmed
*/
{
right onamqp_basic_nack_t *s;
s = (amqp_basic_nack_t *) method.decoded;
fprintf(stdout, "NAck.delivery_tag=%d\n", s->delivery_tag);
fprintf(stdout, "NAck.multiple=%d\n", s->multiple);
fprintf(stdout, "queue=%d\n", s->requeue);
}
break;
ca AMQP_BASIC_RETURN_METHOD:
/* if a published message couldn't be routed and the mandatory flag was t
* this is what would be returned. The message then needs to be read.
*/
{
amqp_message_t message;
amqp_basic_return_t *s;
char str[1024];
s = (amqp_basic_return_t *) method.decoded;
fprintf(stdout, "ply_code=%d\n", s->reply_code);
strncpy(str, s->reply_text.bytes, s->reply_text.len); str[s->reply_text.len] = 0;
fprintf(stdout, "ply_text=%s\n", str);
ret = amqp_read_message(conn, frame.channel, &message, 0);
if (AMQP_RESPONSE_NORMAL != ply_type) {
return;
}
strncpy(str, message.body.bytes, message.body.len); str[message.body.len] = 0; fprintf(stdout, "ssage=%s\n", str);
amqp_destroy_message(&message);
}
break;
ca AMQP_CHANNEL_CLOSE_METHOD:
/* a channel.clo method happens when a channel exception occurs, this
* can happen by publishing to an exchange that doesn't exist for example
*
* In this ca you would need to open another channel redeclare any queues
* that were declared auto-delete, and restart any consumers that were attached * to the previous channel
*/
return;
ca AMQP_CONNECTION_CLOSE_METHOD:
/* a connection.clo method happens when a connection exception occurs,
/* a connection.clo method happens when a connection exception occurs,
* this can happen by trying to u a channel that isn't open for example.
*
* In this ca the whole connection must be restarted.
*/
return;
default:
fprintf(stderr ,"An unexpected method was received %d\n", hod.id);
return;
}
}
}
}
die_on_amqp_error(amqp_channel_clo(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_clo(conn, AMQP_REPLY_SUCCESS), "Closing connection"); die_on_error(amqp_destroy_connection(conn), "Ending connection");
return 0;
}