RabbitMQ消息确认(发送确认,接收确认)

更新时间:2023-07-01 11:42:48 阅读: 评论:0

RabbitMQ消息确认(发送确认,接收确认)
前⾯⼏篇记录了收发消息的demo,今天记录下关于消息确认⽅⾯的问题.
下⾯是⼏个问题:
1.为什么要进⾏消息确认?
2.rabbitmq消息确认机制是什么样的?
3.发送⽅如何确认消息发送成功?什么样才算发送成功?
4.消费⽅如何告知rabbitmq消息消费成功或失败?
5.使⽤spring的代码⽰例
1.为什么要进⾏消息确认?
经常会听到丢消息的字眼, 对于前⾯的demo来说,就存在丢消息的隐患.
发送者没法确认是否发送成功,消费者处理失败也⽆法反馈.
没有消息确认机制,就会出现消息莫名其妙的没了,也不知道什么情况.
2.rabbitmq消息确认机制是什么样的?顶岗实习自我鉴定
⽹上会有很多总结的博客(包括现在看的),很多就是对官⽹的翻译.所以看资料⾸先要去官⽹看看,这很关键.
看上图官⽹的介绍.唯⼀保证消息不丢失的是使⽤事务,但是性能太差,作为补偿,有了消息确认机制.
并说明了开启⽅法,以及和事务模式不共存.
还写了⼀个例⼦,但是点进去那个链接已经失效了,新版的源码上也没有这个例⼦,我找了最近⼀版是3.6.7上⾯还有.
3.发送的消息什么样才算成功或失败? 如何确认?
判断消息成功或失败,其实就是看进⾏消息确认的时机,因为成功或失败后就会把结果告诉发送⽅.还是看官⽅解释:
意思如下:
确认消息不能路由时(exchange确认不能路由到任何queue),进⾏确认操作(确认失败).如果发送⽅设置了mandatory模式,则会先调⽤urn⽅法.消息可以路由时,当需要发送的队列都发送成功后,进⾏消息确认.对于持久化的队列,意味着已经写⼊磁盘,对于镜像队列,意味着所有镜像都接受成功.
⾄于如何确认的问题,上⾯已经写了 basic.ack⽅法
4.消费⽅如何告知rabbitmq消息消费成功或失败?
如图可知,根据消费⽅不同的确认模式,确认时机也不同.
⾃动确认会在消息发送给消费者后⽴即确认,如果⼿动则当消费者调⽤ack,nack,reject⼏种⽅法时进⾏确认.
⼀般会设置⼿动模式,业务失败后可以进⾏⼀些操作.
5.使⽤spring的代码⽰例
下⾯是⼀个使⽤spring整合的代码⽰例:
⾸先是rabbitmq的配置⽂件:
[html]
1. <?xml version="1.0" encoding="UTF-8"?>
2. <beans xmlns="www.springframework/schema/beans"
3.    xmlns:xsi="www.w3/2001/XMLSchema-instance" xmlns:rabbit="www.springframework/schema/rabbit"
4.    xsi:schemaLocation="www.springframework/schema/beans
5.    www.springframework/schema/beans/spring-beans.xsd
6.    www.springframework/schema/rabbit
7.    www.springframework/schema/rabbit/spring-rabbit-1.4.xsd">
8.    <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要⽤跟jar包匹配的版本 -->
9.
10.    <bean id="jsonMessageConverter" class="org.springframework.verter.Jackson2JsonMessageConverter" />
11.
12.    <rabbit:connection-factory
13.        id="connectionFactory"
党史演讲稿
14.        host="${rabbit.host}"
15.        port="${rabbit.port}"
16.        urname="${rabbit.urname}"
17.        password="${rabbit.password}"
18.        publisher-confirms="true"
19.    />
20.
21.    <rabbit:admin connection-factory="connectionFactory" />
22.
23.    <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才⽣效 -->
24.    <rabbit:template id="amqpTemplate"  connection-factory="connectionFactory"
25.        confirm-callback="confirmCallBackListener"
26.        return-callback="returnCallBackListener"
27.        mandatory="true"
28.    />
30.    <rabbit:queue name="CONFIRM_TEST" />
31.
32.    <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
33.        <rabbit:bindings>
34.            <rabbit:binding queue="CONFIRM_TEST" />
35.        </rabbit:bindings>
36.    </rabbit:direct-exchange>
37.
38.    <!-- 配置consumer, 监听的类和queue的对应关系 -->
39.    <rabbit:listener-container
40.        connection-factory="connectionFactory" acknowledge="manual" >
41.        <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />
42.    </rabbit:listener-container>
43.
44. </beans>
然后发送⽅:
[java]
1. import org.AmqpTemplate;
2. import org.springframework.beans.factory.annotation.Autowired;
3. import org.springframework.stereotype.Service;
4.
5. @Service("publishService")
6. public class PublishService {
7.    @Autowired
8.    private AmqpTemplate amqpTemplate;
9.
10.    public void nd(String exchange, String routingKey, Object message) {
11.        vertAndSend(exchange, routingKey, message);
12.    }
13. }
消费⽅:
[java]
1. import org.Message;
2. import org.springframework.ChannelAwareMessageListener;
3. import org.springframework.stereotype.Service;
4.
5. import com.rabbitmq.client.Channel;
6.
7. @Service("receiveConfirmTestListener")
8. public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {
9.    @Override
10.    public void onMessage(Message message, Channel channel) throws Exception {
11.        try{
12.            System.out.println("consumer--:"+MessageProperties()+":"+new Body()));
13.            channel.MessageProperties().getDeliveryTag(), fal);
14.        }catch(Exception e){
15.            e.printStackTrace();//TODO 业务处理
16.            channel.MessageProperties().getDeliveryTag(), fal,fal);
17.        }
18.    }
19. }
确认后回调:
[java]
1. import org.springframework.RabbitTemplate.ConfirmCallback;
2. import org.springframework.amqp.rabbit.support.CorrelationData;
3. import org.springframework.stereotype.Service;
4.
5. @Service("confirmCallBackListener")
6. public class ConfirmCallBackListener implements ConfirmCallback{
7.    @Override
8.    public void confirm(CorrelationData correlationData, boolean ack, String cau) {
缺土9.        System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cau:"+cau);
10.    }
11. }
失败后return回调:
1. import org.Message;
2. import org.springframework.RabbitTemplate.ReturnCallback;
3. import org.springframework.stereotype.Service;
4.
5. @Service("returnCallBackListener")
6. public class ReturnCallBackListener implements ReturnCallback{
7.    @Override
8.    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
9.        System.out.println("return--
message:"+new Body())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
10.    }
11. }
测试类:
[java]
1. import org.junit.Test;
2. import org.junit.runner.RunWith;
规划方案怎么写3. import org.springframework.beans.factory.annotation.Autowired;
藕种4. import st.context.ContextConfiguration;
5. import st.context.junit4.SpringJUnit4ClassRunner;
6.
7. import firms.publish.PublishService;
8.
9. @RunWith(SpringJUnit4ClassRunner.class)
10. @ContextConfiguration(locations = {"l"})
11. public class TestConfirm {
12.    @Autowired
13.    private PublishService publishService;
14.
杭州美术馆15.    private static String exChange = "DIRECT_EX";
16.
17.    @Test
18.    public void test1() throws InterruptedException{
19.        String message = "currentTime:"+System.currentTimeMillis();
20.        System.out.println("test1---message:"+message);
21.        //exchange,queue 都正确,confirm被回调, ack=true
22.        publishService.nd(exChange,"CONFIRM_TEST",message);
23.        Thread.sleep(1000);
24.    }
25.
26.    @Test
27.    public void test2() throws InterruptedException{
28.        String message = "currentTime:"+System.currentTimeMillis();
29.        System.out.println("test2---message:"+message);
30.        //exchange 错误,queue 正确,confirm被回调, ack=fal
31.        publishService.nd(exChange+"NO","CONFIRM_TEST",message);
32.        Thread.sleep(1000);
33.    }
34.
35.    @Test
36.    public void test3() throws InterruptedException{
37.        String message = "currentTime:"+System.currentTimeMillis();
38.        System.out.println("test3---message:"+message);
39.        //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
40.        publishService.nd(exChange,"",message);
汲汲于生
41. //        Thread.sleep(1000);
42.    }
43.
44.    @Test
45.    public void test4() throws InterruptedException{
46.        String message = "currentTime:"+System.currentTimeMillis();
47.        System.out.println("test4---message:"+message);
48.        //exchange 错误,queue 错误,confirm被回调, ack=fal
49.        publishService.nd(exChange+"NO","CONFIRM_TEST",message);
50.        Thread.sleep(1000);
51.    }
52. }
测试结果:
[html]
1. test1---message:currentTime:1483786948506
2. test2---message:currentTime:1483786948532
3. consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-
f0448420e441}, timestamp=null, messageId=null, urId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=fal, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=
4. test3---message:currentTime:1483786948536
5. confirm--:correlationData:null,ack:fal,cau:channel error; protocol method: #method<channel.clo>(reply-code=404, reply-
text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
6. confirm--:correlationData:null,ack:fal,cau:Channel clod by application
7. [ERROR] 2017-01-07 19:02:28 org.springframework.tion.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):-
-> Channel shutdown: channel error; protocol method: #method<channel.clo>(reply-code=404, reply-
text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
8.  return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
9. confirm--:correlationData:null,ack:true,cau:null
10. test4---message:currentTime:1483786948546
11. confirm--:correlationData:null,ack:fal,cau:channel error; protocol method: #method<channel.clo>(reply-code=404, reply-
text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
12. [ERROR] 2017-01-07 19:02:28 org.springframework.tion.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):-
高一自我陈述报告
-
> Channel shutdown: channel error; protocol method: #method<channel.clo>(reply-code=404, reply-
text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
13.
代码和配置⾥⾯,已经都有注释,就不在多说明了.(callback是异步的,所以测试中sleep1秒钟等待下)
总结下就是:
如果消息没有到exchange,则confirm回调,ack=fal
如果消息到达exchange,则confirm回调,ack=true
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
备注:需要说明,spring-rabbit和原⽣的rabbit-client ,表现是不⼀样的.
测试的时候,原⽣的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的

本文发布于:2023-07-01 11:42:48,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1072067.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:确认   消息   发送   失败   模式
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图