1. 存在的问题
再生产环境中由于一些不明原因导致rabbitmq
重启,在rabbitmq
重启期间生产者消息投递失败,会导致消息丢失。
当消息不能正常被接收的时候,我们需要将消息存放在缓存中。
spring.rabbitmq.host=192.168.123.129spring.rabbitmq.port=5672spring.rabbitmq.urname=adminspring.rabbitmq.password=123spring.rabbitmq.publisher-confirm-type=correlated
none
:禁用发布确认模式,是默认值。correlated
:发布消息成功到交换机会触发回调方方法。correlated
:就是发布一个就确认一个。import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;@configurationpublic class confirmconfig { public static final string confirm_exchange_name = "confirm_exchange"; public static final string confirm_queue_name = "confirm_queue"; public static final string confirm_routing_key = "key1"; @bean("confirmexchange") public directexchange confirmexchange(){ return new directexchange(confirm_exchange_name); } @bean("confirmqueue") public queue confirmqueue(){ return queuebuilder.durable(confirm_queue_name).build(); } @bean public binding queuebindingexchange(@qualifier("confirmexchange") directexchange confirmexchange, 圣诞节是几月几日 @qualifier("confirmqueue") queue confirmqueue){ return bindingbuilder.bind(confirmqueue).to(confirmexchange).with(confirm_routing_key); }}
import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.stereotype.component;import javax.annotation.postconstruct;/** * 回调接口 */@component@slf4jpublic class mycallback implements rabbittemplate.confirmcallback { @autowired rabbittemplate rabbittemplate; @postconstruct public void init(){ rabbittemplate.tconfirmcallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的id及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationdata * @param b * @param s */ @override public void c推翻onfirm(correlationdata correlationdata, boolean b, string s) { string id = correlationdata != null ? correlationdata.getid() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }el{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } }}
import com.xiao.springbootrabbitmq.utils.mycallback;import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.pathvariable;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.restcontroller;import javax.annotation.postconstruct;@restcontroller@requestmapping("/confirm")@slf4jpublic class producer { public static final string confirm_exchange_name = "confirm_exchange"; @autowired rabbittemplate rabbittemplate; @getmapping("/ndmessage/{messa车辆管理方案ge}") public void ndmessage(@pathvariable string message){ correlationdata correlationdata1 = new correlationdata("1"); string routingkey1 = "key1"; rabbittemplate.convertandnd(confirm_exchange_name,routingkey1,message + routingkey1,correlationdata1); correlationdata correlationdata2 = new correlationdata("2"); string routingkey2 = "key2"; rabbittemplate.convertandnd(confirm_exchange_name,routingkey2,message + routingkey2,correlationdata2); log.info("发送得内容是:{}",message); }}
import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.rabbit.annotation.rabbitlistener;import org.springframework.stereotype.component;@component@slf4jpublic class confirmconsumer { pub我很棒lic static final string confirm_queue_name = "confirm_queue"; @rabbitlistener(queues = confirm_queue_name) public void receivemessage(message message){ string msg = new string(message.getbody()); log.info("接收到队列" + confirm_queue_name + "消息:{}",msg); }}
1. 第一种情况
id
为1
的消息正常送达,id
为2
的消息由于routingkey
的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收。
2. 第二种情况
我们再上一种情况下修改了id
为1
的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息。
spring.rabbitmq.publisher-returns=true
需要在配置文件种开启返回回调
import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.pathvariable;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.restcontroller;import javax.annotation.postconstruct;@restcontroller@requestmapping("/confirm")@slf4jpublic class producer { public static final string confirm_exchange_name = "confirm_exchange"; @autowired rabbittemplate rabbittemplate; @getmapping("/ndmessage/{message}") public void ndmessage(@pathvariable string message){ correlationdata correlationdata1 = new correlationdata("1"); string routingkey1 = "key1"; rabbittemplate.convertandnd(confirm_exchange_name,routingkey1,message + routingkey1,correlationdata1); log.info("发送得内容是:{}",message + routingkey1); correlationdata correlationdata2 = new correlationdata("2"); string routingkey2 = "key2"; rabbittemplate.convertandnd(confirm_exchange_name,routingkey2,message + routingkey2,correlationdata2); log.info("发送得内容是:{}",message + routingkey2); }}
import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.core.returnedmessage;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.stereotype.component;import javax.annotation.postconstruct;/** * 回调接口 */@component@slf4jpublic class mycallback implements rabbittemplate.confirmcallback,rabbittemplate.returnscallback { @autowired rabbittemplate rabbittemplate; @postconstruct public void init(){ rabbittemplate.tconfirmcallback(this); rabbittemplate.treturnscallback(this); } /** * 交换机接受失败后进行回调 * 1. 保存消息的id及相关消息 * 2. 是否接收成功 * 3. 接受失败的原因 * @param correlationdata * @param b * @param s */ @override public void confirm(correlationdata correlationdata, boolean b, string s) { string id = correlationdata != null ? correlationdata.getid() : ""; if(b == true){ log.info("交换机已经收到id为:{}的消息",id); }el{ log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s); } } @override public void returnedmessage(returnedmessage returnedmessage) { message message = returnedmessage.getmessage(); string exchange = returnedmessage.getexchange(); string routingkey = returnedmessage.getroutingkey(); string replytext = returnedmessage.getreplytext(); log.error("消息{},被交换机{}退回,回退原因:{},路由key:{}",new string(message.getbody()),exchange,replytext,routingkey); }}
其他类的代码与上一小节案例相同
id
为2
的消息由于routingkey
不可路由,但是还是被回调函数处理了。
这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机。
import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;@configurationpublic class confirmconfig { public static final string confirm_exchange_name = "confirm_exchange"; public static final string confirm_queue_name = "confirm_queue"; public static final string backup_exchange_name = "backup_exchange"; public static final string backup_queue_name = "backup_queue"; public static final string warning_queue_name = "warning_queue"; public static final string confirm_routing_key = "key1"; @bean("confirmexchange") public directexchange confirmexchange(){ return exchangebuilder.directexchange(confirm_exchange_name).durable(true) .withargument("alternate-exchange",backup_exchange_name).build(); } @bean("confirmqueue") public queue confirmqueue(){ return queuebuilder.durable(confirm_queue_name).build(); } @bean("backupexchange") public fanoutexchange backupexchange(){ return new fanoutexchange(backup_exchange_name); } @bean("backupqueue") public queue backupqueue(){ return queuebuilder.durable(backup_queue_name).build(); } @bean("warningqueue") public queue warningqueue(){ return queuebuilder.durable(warning_queue_name).build(); } @bean public binding queuebindingexchange(@qualifier("confirmexchange") directexchange confirmexchange, @qualifier("confirmqueue") queue confirmqueue){ return bindingbuilder.bind(confirmqueue).to(confirmexchange).with(confirm_routing_key); } @bean public binding queuebindingexchange1(@qualifier("backupexchange") fanoutexchange backupexchange, @qualifier("backupqueue") queue backupqueue){ return bindingbuilder.bind(backupqueue).to(backupexchange); } @bean public binding queuebindingexchange2(@qualifier("backupexchange") fanoutexchange backupexchange, @qualifier("warningqueue") queue warningqueue){ return bindingbuilder.bind(warningqueue).to(backupexchange); }}
import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.rabbit.annotation.rabbitlistener;import org.springframework.stereotype.component;@component@slf4jpublic class warningconsumer { public static final string warning_queue_name = "warning_queue"; @rabbitlistener(queues = warning_queue_name) public void receivemessage(message message){ string msg = new string(message.getbody()); log.info("报警发现不可路由的消息内容为:{}",msg); }}
mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。
到此这篇关于rabbitmq发布确认高级的文章就介绍到这了,更多相关rabbitmq发布确认高级内容请搜索www.887551.com论英雄以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-04 09:50:39,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/d98dc418dc9f0c45e3396070525ffc46.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:聊聊RabbitMQ发布确认高级问题.doc
本文 PDF 下载地址:聊聊RabbitMQ发布确认高级问题.pdf
留言与评论(共有 0 条评论) |