rabbitmq异步消息确认机制
rabbitMq在实际发消息的时候会把 nextPublishSeqNo 存⼊到 unconfirmedSet 中
// com.rabbitmq.client.impl.ChannelN#basicPublish()
public void basicPublish(String exchange, String routingKey,
boolean mandatory,boolean immediate,
BasicProperties props,byte[] body)
throws IOException
{
// 只有当开启消息确认和事务消息时,才会在初始时设置 nextPublishSeqNo=1
if(nextPublishSeqNo >0){
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
if(props ==null){
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command =new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.
build(), props, body);
nsation什么意思
try{
transmit(command);
}catch(IOException e){
metricsCollector.basicPublishFailure(this, e);
borderland
throw e;
}
metricsCollector.basicPublish(this);
}
在消息回调时处理qNo
// com.rabbitmq.client.impl.ChannelN#handleAckNack()
dungeon
classic是什么意思
private void handleAckNack(long qNo,boolean multiple,boolean nack){
if(multiple){
// 如果是批量的,则把当前qNo以及之前的全部清除
unconfirmedSet.headSet(qNo +1).clear();
}el{
谢绝参观
}
synchronized(unconfirmedSet){
onlyAcksReceived = onlyAcksReceived &&!nack;
if(unconfirmedSet.isEmpty())
}
}
nextPublishSeqNo 不会被发送到 rabbitMq 。如何保证与 deliveryTag 对应?在⼀个连接存续期间,服务器返回的 deliveryTag 会⾃增,需要应⽤程序⾃⼰保证每发⼀条信息⾃增1,与服务器保持⼀致。
⾮确认消息,不会使⽤到 nextPublishSeqNo 。
为了保证能及时的响应异步回调,驱动程序会启动⼀个线程专门监听rabbitMq服务消息。
// com.rabbitmq.client.impl.AMQConnection.MainLoop
private class MainLoop implements Runnable {
/**
国庆节用英语怎么说
* Channel reader thread main loop. Reads a frame, and if it is * not a heartbeat frame, dispatches it to the channel it refers to. * Continues running until the "running" flag is t fal by
* shutdown().
*/
@Override
public void run(){
boolean shouldDoFinalShutdown =true;
try{
while(_running){
Frame frame = _adFrame();
// 循环读取来⾃服务器的消息
readFrame(frame);
analogy}
}catch(Throwable ex){
really的用法
if(ex instanceof InterruptedException){
// loop has been interrupted during shutdown,
// no need to do it again
shouldDoFinalShutdown =fal;
}el{
handleFailure(ex);
}
}finally{
if(shouldDoFinalShutdown){
doFinalShutdown();
tibetan}
典范}
}
}