rabbitmq~消息失败后重试达到TTL放到死信队列(事务型消息补偿机制)
这是⼀个基于消息的分布式事务的⼀部分,主要通过消息来实现,⽣产者把消息发到队列后,由消费⽅去执⾏剩下的逻辑,⽽当消费⽅处理失败后,我们需要进⾏重试,即为了最现数据的最终⼀致性,在rabbitmq⾥,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到后,消息不能⾃动放⼊死信队列,所以这块需要⼿⼯处理⼀下.
rabbitmq关于消息重试的配置
rabbitmq:
host: xxx
port: xxx
urname: xxx
password: xxx
virtual-host: xxx
###开启消息确认机制 confirms
publisher-confirms: true
publisher-returns: true
listener:
simple:
聘用证明模板acknowledge-mode: manual #设置确认⽅式
prefetch: 1 #每次处理1条消息
retry.max-attempts: 3 # 最⼤重试次数
retry.initial-interval: 2000 #重试间隔时间(单位毫秒)
default-requeue-rejected: true #该配置项是决定由于监听器抛出异常⽽拒绝的消息是否被重新放回队列。默认值为true,需要⼿动basicNack时这些参数谅失效了⼿⼯实现消息重试并放⼊死信的⽅式
定义队列的相关配置
/**
* 创建普通交换机.
*/
@Bean
public TopicExchange lindExchange() {
一大二小打一字//消息持久化
return (TopicExchange) picExchange(EXCHANGE).durable(true).build();
}
@Bean
public TopicExchange deadExchange() {
资助育人
return (TopicExchange) picExchange(LIND_DL_EXCHANGE).durable(true).build();
}
/**
* 基于消息事务的处理⽅式,当消费失败进⾏重试,有时间间隔,当达到超时时间,就发到死信队列,等待⼈⼯处理.
* @return
*/
@Bean
public Queue testQueue() {社会实践证明模板
//设置死信交换机
return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)
//毫秒
.withArgument("x-message-ttl", CONSUMER_EXPIRE)
//设置死信routingKey
.withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build();
}
@Bean
public Queue deadQueue() {
return new Queue(LIND_DEAD_QUEUE);
}
@Bean
public Binding bindBuildersRouteKey() {
return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER);
皇帝不急太监急}
@Bean
public Binding bindDeadBuildersRouteKey() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE);
}
消费者实现的代码
/**
* 延时队列:不应该有RabbitListener订阅者,应该让它⾃⼰达到超时时间后⾃动转到死信⾥去消费
* 消息异常处理:消费出现异常后,延时⼏秒,然后从新⼊队列消费,直到达到TTL超时时间,再转到死信,证明这个信息有问题需要⼈⼯⼲预
*
* @param message
*/
@RabbitListener(queues = MqConfig.QUEUE)
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
try {
System.out.w() + ":Subscriber:" + new Body(), "UTF-8"));
//当程序处理出现问题时,消息使⽤basicReject上报
int a = 0;
int b = 1 / a;
channel.MessageProperties().getDeliveryTag(), true);
} catch (Exception ex) {
//出现异常⼿动放回队列
Thread.sleep(2000);
channel.MessageProperties().getDeliveryTag(), fal, true);琥珀形成
}
}
/**
* 死信队列.
*
* @param message
*/
@RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE)
public void dealSubscribe(Message message, Channel channel) throws IOException {用字母表示数教学反思
System.out.println("Dead Subscriber:" + new Body(), "UTF-8"));
channel.MessageProperties().getDeliveryTag(), true);
}
消费者这块,也可以直接声明队列和绑定交换机,直接在注解上添加 QueueBinding即可.
@RabbitListener(bindings = {@QueueBinding(value = @Queue(
name = MqConfig.QUEUE,
durable = "true",arguments = {@Argument(name = "x-dead-letter-exchange", value = MqConfig.LIND_DL_EXCHANGE),
@Argument(name = "x-message-ttl", value = MqConfig.CONSUMER_EXPIRE,type="java.lang.Long"),
@Argument(name = "x-dead-letter-routing-key", value = MqConfig.LIND_DEAD_QUEUE)}),
exchange = @Exchange(value = MqConfig.EXCHANGE, durable = "true",type="topic")
)})
public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
}
属龙和属蛇
这边尝试让消费者执⾏出错,然后⾛到catch⾥使⽤basicNack⽅法把消息从新放⾥队列⾥,并让线程让休息2秒,以避免频繁操作,之后就是我们希望看到的代码2019-12-20T17:21:31.190:Subscriber:nd a message to mq
2019-12-20T17:21:33.200:Subscriber:nd a message to mq
2019-12-20T17:21:35.206:Subscriber:nd a message to mq
2019-12-20T17:21:37.213:Subscriber:nd a message to mq
2019-12-20T17:21:39.221:Subscriber:nd a message to mq
Dead Subscriber:nd a message to mq
这就是⼀个消息队列的补偿机制,使⽤死信队列也可以实现延时消息的机制,有时间再给⼤家分享!