java实现按照时间梯度异步通知第三⽅接⼝
业务背景
在对外提供的API 接⼝中,处理完⾃⾝的业务逻辑后需要调⽤第三⽅系统接⼝,将相应的处理结果通知给对⽅,就像微信、⽀付宝⽀付后 异步通知⽀付结果⼀样,按照1s,2s,5s,这种⾃定义的的时间梯度来通知第三⽅接⼝。
实现思路
在业务完成后把要推送的消息存⼊数据库,并且发送⾄mq的延时消息队列,在mq 消费时判断本次推送等级并且计算下⼀等级推送时间,如果本次回调第三⽅未得到正确响应则继续发送下⼀等级的mq 延时队列。
使⽤技术
springboot 2.x + rocketmq-spring-boot-starter-2.0.4 + mysql
数据库表设计
回调通知数据库表设计:
CREATE TABLE `tb_callback_notify` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`ur_no` varchar(50) DEFAULT NULL COMMENT '⽤户编号',
`order_no` varchar(50) DEFAULT NULL COMMENT '订单号',
`notify_url` varchar(255) DEFAULT NULL COMMENT '通知url',
`notify_data` text COMMENT '通知内容',
`notify_times` int(11) DEFAULT '0' COMMENT '通知次数(等级)',
`last_notify_time` datetime DEFAULT NULL COMMENT '最后⼀次通知时间',
`next_notify_time` datetime DEFAULT NULL COMMENT '下次通知时间',
`status` tinyint(1) DEFAULT '1' COMMENT '状态 1 未完成 2已完成',
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
⾃定义时间梯度:
在rocketmq-spring-boot-starter中,实现延时队列有固定的18个等级,每个等级对应的延时时长分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,在⾃⼰的业务中 我们可以选取部分等级来作为回调通知的时间梯度,我这⾥选取了1s 5s 10s 30s 1m 5m 10m 30m 1h 2h 作为我⽅通知的时间梯度策略
代码实现
public class Constant {
/**
* 最⼤回调通知次数
*/4个全面
public static final Integer MAX_NOTIFY_TIMES = 10;
/**
* rocekt mq 延时等级对应的秒数
孤独的网名*/
public static final Integer[] ROCKET_MQ_DELAY_LEVEL_SECOND = {1,5,10,30,60,120,180,240,300,360,420,480,540,600,1200,1800,3600,7200};
/**
* 回调通知topic
*/
public static final String QUEUE_CALLBACK_TOPIC="queue_callback";
/**
* 回调通知group
*/
public static final String QUEUE_GROUP_CALLBACK_TOPIC="queue_group_callback";
/**
* 系统通知为最⼤10次,每次通知对应mq 等级<br/>
* 回调通知频率,对应rocketmq 延时等级策略<br/>
* rocketmq 延时等级: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>
* 取mq的 1s 5s 10s 30s 1m 5m 10m 30m 1h 2h 作为我⽅通知策略
*/
public static final Integer[] CALLBACK_PUSH_FREQUENCY_TO_MQ_LEVEL = {1,2,3,4,5,9,14,16,17,18};
}
业务处理完后异步调⽤此rvice的callback⽅法进⾏通知
@Service
public class CallbackNotifyServiceImpl extends ServiceImpl<CallbackNotifyMapper, CallbackNotify> implements ICallbackNotifyService {
@Autowired
private CallbackNotifyMapper callbackNotifyMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void callback(String urNo, String orderNo, String notifyUrl, String notifyData) {
// 校验是否正在进⾏回调
CallbackNotify notify = callbackNotifyMapper.lectOne(new QueryWrapper<CallbackNotify>().eq("order_no", orderNo));
if(Asrt.isNullOrEmpty(notify)) {
Date now = new Date();
notify = new CallbackNotify();
notify.tOrderNo(orderNo);
notify.tUrNo(urNo);
notify.tStatus(1);
notify.tCreateTime(now);
notify.tNotifyUrl(notifyUrl);
notify.tNotifyData(notifyData);
notify.tNotifyTimes(0);
callbackNotifyMapper.inrt(notify);
}
rocketMQTemplate.syncSend(Constant.QUEUE_CALLBACK_TOPIC, MessageBuilder.JSONString(notify)).build());
}
}
rocketMQ监听器初学广场舞入门
/**
* 回调消息推送监听<br/>
* 如果此次推送失败或者收到的响应是fal的时候重新计算下次推送时间,并且再次发送下⼀级延时等级的队列
* @author francis
*
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = Constant.QUEUE_CALLBACK_TOPIC, consumerGroup = Constant.QUEUE_GROUP_CALLBACK_TOPIC)
public class CallbackNotifyListener implements RocketMQListener<MessageExt> {
@Autowired
private ICallbackNotifyService callbackNotifyService;
@Value("${rocketmq.producer.nd-message-timeout}")
private Integer messageTimeOut;
德国政党@Autowired
private RocketMQTemplate rocketMQTemplate;
private final RestTemplate restTemplate = new RestTemplateBuilder().tConnectTimeout(Duration.ofMillis(3000)).tReadTimeout(Duration.ofMillis(4000)).build
@Override
public void onMessage(MessageExt message) {
byte[] body = Body();
String msg = new String(body);
log.info("接收到消息:{}", msg);
Date now = new Date();
CallbackNotify callbackNotify = JSONObject.parObject(msg, CallbackNotify.class);
火灾事故报告callbackNotify.tLastNotifyTime(now);
Integer times = NotifyTimes();
times++;
// ⼤于最⼤通知次数直接结束
if(times > Constant.MAX_NOTIFY_TIMES) {
return;
}
// 下⼀次推送延时等级
Integer nextLevel = Constant.CALLBACK_PUSH_FREQUENCY_TO_MQ_LEVEL[times];
// 计算下⼀次推送的秒
Integer nextNotifySecond = Constant.ROCKET_MQ_DELAY_LEVEL_SECOND[nextLevel];
callbackNotify.tNextNotifyTime(DateUtil.offtSecond(now, nextNotifySecond));
callbackNotify.tNotifyTimes(times);
JSONObject notifyData = JSONObject.NotifyData());
HttpHeaders headers = new HttpHeaders();
headers.tContentType(MediaType.APPLICATION_JSON);
HttpEntity<JSONObject> request = new HttpEntity<JSONObject>(notifyData, headers);
try {
ResponEntity<String> respon = restTemplate.NotifyUrl(), request, String.class);
// 推送成功
if(String().Body())) {
callbackNotify.tStatus(2);
}el {
巧克力爱情
rocketMQTemplate.syncSend(Constant.QUEUE_CALLBACK_TOPIC, MessageBuilder.JSONString(callbackNotify)).build(), messag }
} catch (Exception e) {
黄鳝怎么烧好吃
<("推送回调消息异常", e);
北京最低社保
rocketMQTemplate.syncSend(Constant.QUEUE_CALLBACK_TOPIC, MessageBuilder.JSONString(callbackNotify)).build(), message }
callbackNotify.tLastNotifyTime(now);
callbackNotifyService.updateById(callbackNotify);
}
}