首页 > 作文

聊聊RabbitMQ发布确认高级问题

更新时间:2023-04-04 09:50:41 阅读: 评论:0

1、发布确认高级

1. 存在的问题

再生产环境中由于一些不明原因导致rabbitmq重启,在rabbitmq重启期间生产者消息投递失败,会导致消息丢失。

1.1、发布确认springboot版本

1.1.1、确认机制方案

当消息不能正常被接收的时候,我们需要将消息存放在缓存中。

1.1.2、代码架构图

1.1.3、配置文件

spring.rabbitmq.host=192.168.123.129spring.rabbitmq.port=5672spring.rabbitmq.urname=adminspring.rabbitmq.password=123spring.rabbitmq.publisher-confirm-type=correlated
none:禁用发布确认模式,是默认值。correlated:发布消息成功到交换机会触发回调方方法。correlated:就是发布一个就确认一个。

1.1.4、配置类

import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;@configurationpublic class confirmconfig {    public static final string confirm_exchange_name = "confirm_exchange";    public static final string confirm_queue_name = "confirm_queue";    public static final string confirm_routing_key = "key1";    @bean("confirmexchange")    public directexchange confirmexchange(){        return new directexchange(confirm_exchange_name);    }    @bean("confirmqueue")    public queue confirmqueue(){        return queuebuilder.durable(confirm_queue_name).build();    }    @bean    public binding queuebindingexchange(@qualifier("confirmexchange") directexchange confirmexchange,                         圣诞节是几月几日               @qualifier("confirmqueue") queue confirmqueue){        return bindingbuilder.bind(confirmqueue).to(confirmexchange).with(confirm_routing_key);    }}

1.1.5、回调接口

import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.stereotype.component;import javax.annotation.postconstruct;/** * 回调接口 */@component@slf4jpublic class mycallback implements rabbittemplate.confirmcallback {    @autowired    rabbittemplate rabbittemplate;    @postconstruct    public void init(){        rabbittemplate.tconfirmcallback(this);    }    /**     * 交换机接受失败后进行回调     * 1. 保存消息的id及相关消息     * 2. 是否接收成功     * 3. 接受失败的原因     * @param correlationdata     * @param b     * @param s     */    @override    public void c推翻onfirm(correlationdata correlationdata, boolean b, string s) {        string id = correlationdata != null ? correlationdata.getid() : "";        if(b == true){            log.info("交换机已经收到id为:{}的消息",id);        }el{            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);        }    }}

1.1.6、生产者

import com.xiao.springbootrabbitmq.utils.mycallback;import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.pathvariable;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.restcontroller;import javax.annotation.postconstruct;@restcontroller@requestmapping("/confirm")@slf4jpublic class producer {    public static final string confirm_exchange_name = "confirm_exchange";    @autowired    rabbittemplate rabbittemplate;    @getmapping("/ndmessage/{messa车辆管理方案ge}")    public void ndmessage(@pathvariable string message){        correlationdata correlationdata1 = new correlationdata("1");        string routingkey1 = "key1";        rabbittemplate.convertandnd(confirm_exchange_name,routingkey1,message + routingkey1,correlationdata1);        correlationdata correlationdata2 = new correlationdata("2");        string routingkey2 = "key2";        rabbittemplate.convertandnd(confirm_exchange_name,routingkey2,message + routingkey2,correlationdata2);        log.info("发送得内容是:{}",message);    }}

1.1.7、消费者

import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.rabbit.annotation.rabbitlistener;import org.springframework.stereotype.component;@component@slf4jpublic class confirmconsumer {    pub我很棒lic static final string confirm_queue_name = "confirm_queue";    @rabbitlistener(queues = confirm_queue_name)    public void receivemessage(message message){        string msg = new string(message.getbody());        log.info("接收到队列" + confirm_queue_name + "消息:{}",msg);    }}

1.1.8、测试结果

1. 第一种情况

id1的消息正常送达,id2的消息由于routingkey的错误,导致不能正常被消费,但是交换机还是正常收到了消息,所以此时由于交换机正常接收之后的原因丢失的消息不能正常被接收

2. 第二种情况

我们再上一种情况下修改了id1的消息的交换机的名称,所以此时回调函数会进行回答由于什么原因导致交换机无法接收成功消息

1.2、回退消息

1.2.1、mandatory参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由(就是消息被交换机成功接收后,无法到达队列),那么消息会直接被丢弃,此时生产者是不知道消息被丢弃这个事件的。通过设置该参数可以在消息传递过程中不可达目的地时将消息返回给生产者。

1.2.2、配置文件

spring.rabbitmq.publisher-returns=true

需要在配置文件种开启返回回调

1.2.3、生产者代码

import lombok.extern.slf4j.slf4j;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.web.bind.annotation.getmapping;import org.springframework.web.bind.annotation.pathvariable;import org.springframework.web.bind.annotation.requestmapping;import org.springframework.web.bind.annotation.restcontroller;import javax.annotation.postconstruct;@restcontroller@requestmapping("/confirm")@slf4jpublic class producer {    public static final string confirm_exchange_name = "confirm_exchange";    @autowired    rabbittemplate rabbittemplate;    @getmapping("/ndmessage/{message}")    public void ndmessage(@pathvariable string message){        correlationdata correlationdata1 = new correlationdata("1");        string routingkey1 = "key1";        rabbittemplate.convertandnd(confirm_exchange_name,routingkey1,message + routingkey1,correlationdata1);        log.info("发送得内容是:{}",message + routingkey1);        correlationdata correlationdata2 = new correlationdata("2");        string routingkey2 = "key2";        rabbittemplate.convertandnd(confirm_exchange_name,routingkey2,message + routingkey2,correlationdata2);        log.info("发送得内容是:{}",message + routingkey2);    }}

1.2.4、回调接口代码

import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.core.returnedmessage;import org.springframework.amqp.rabbit.connection.correlationdata;import org.springframework.amqp.rabbit.core.rabbittemplate;import org.springframework.beans.factory.annotation.autowired;import org.springframework.stereotype.component;import javax.annotation.postconstruct;/** * 回调接口 */@component@slf4jpublic class mycallback implements rabbittemplate.confirmcallback,rabbittemplate.returnscallback {    @autowired    rabbittemplate rabbittemplate;    @postconstruct    public void init(){        rabbittemplate.tconfirmcallback(this);        rabbittemplate.treturnscallback(this);    }    /**     * 交换机接受失败后进行回调     * 1. 保存消息的id及相关消息     * 2. 是否接收成功     * 3. 接受失败的原因     * @param correlationdata     * @param b     * @param s     */    @override    public void confirm(correlationdata correlationdata, boolean b, string s) {        string id = correlationdata != null ? correlationdata.getid() : "";        if(b == true){            log.info("交换机已经收到id为:{}的消息",id);        }el{            log.info("交换机还未收到id为:{}消息,由于原因:{}",id,s);        }    }    @override    public void returnedmessage(returnedmessage returnedmessage) {        message message = returnedmessage.getmessage();        string exchange = returnedmessage.getexchange();        string routingkey = returnedmessage.getroutingkey();        string replytext = returnedmessage.getreplytext();        log.error("消息{},被交换机{}退回,回退原因:{},路由key:{}",new string(message.getbody()),exchange,replytext,routingkey);    }}

1.2.5、测试结果

其他类的代码与上一小节案例相同

id2的消息由于routingkey不可路由,但是还是被回调函数处理了。

1.3、备份交换机

1.3.1、代码架构图

这里我们新增了备份交换机、备份队列、报警队列。它们绑定关系如图所示。如果确认交换机成功接收的消息无法路由到相应的队列,就会被确认交换机发送给备份交换机

1.3.2、配置类代码

import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.qualifier;import org.springframework.context.annotation.bean;import org.springframework.context.annotation.configuration;@configurationpublic class confirmconfig {    public static final string confirm_exchange_name = "confirm_exchange";    public static final string confirm_queue_name = "confirm_queue";    public static final string backup_exchange_name = "backup_exchange";    public static final string backup_queue_name = "backup_queue";    public static final string warning_queue_name = "warning_queue";    public static final string confirm_routing_key = "key1";    @bean("confirmexchange")    public directexchange confirmexchange(){        return exchangebuilder.directexchange(confirm_exchange_name).durable(true)                .withargument("alternate-exchange",backup_exchange_name).build();    }    @bean("confirmqueue")    public queue confirmqueue(){        return queuebuilder.durable(confirm_queue_name).build();    }    @bean("backupexchange")    public fanoutexchange backupexchange(){        return new fanoutexchange(backup_exchange_name);    }    @bean("backupqueue")    public queue backupqueue(){        return queuebuilder.durable(backup_queue_name).build();    }    @bean("warningqueue")    public queue warningqueue(){        return queuebuilder.durable(warning_queue_name).build();    }    @bean    public binding queuebindingexchange(@qualifier("confirmexchange") directexchange confirmexchange,                                        @qualifier("confirmqueue") queue confirmqueue){        return bindingbuilder.bind(confirmqueue).to(confirmexchange).with(confirm_routing_key);    }    @bean    public binding queuebindingexchange1(@qualifier("backupexchange") fanoutexchange backupexchange,                                        @qualifier("backupqueue") queue backupqueue){        return bindingbuilder.bind(backupqueue).to(backupexchange);    }    @bean    public binding queuebindingexchange2(@qualifier("backupexchange") fanoutexchange backupexchange,                                         @qualifier("warningqueue") queue warningqueue){        return bindingbuilder.bind(warningqueue).to(backupexchange);    }}

1.3.3、消费者代码

import lombok.extern.slf4j.slf4j;import org.springframework.amqp.core.message;import org.springframework.amqp.rabbit.annotation.rabbitlistener;import org.springframework.stereotype.component;@component@slf4jpublic class warningconsumer {    public static final string warning_queue_name = "warning_queue";    @rabbitlistener(queues = warning_queue_name)    public void receivemessage(message message){        string msg = new string(message.getbody());        log.info("报警发现不可路由的消息内容为:{}",msg);    }}

1.3.4、测试结果

mandatory参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高

到此这篇关于rabbitmq发布确认高级的文章就介绍到这了,更多相关rabbitmq发布确认高级内容请搜索www.887551.com论英雄以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-04 09:50:39,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/d98dc418dc9f0c45e3396070525ffc46.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:聊聊RabbitMQ发布确认高级问题.doc

本文 PDF 下载地址:聊聊RabbitMQ发布确认高级问题.pdf

标签:消息   交换机   回调   生产者
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图