使⽤ActiveMQ实现阶梯式消息通知
开始先分享⼀下借鉴的帖⼦有:
1.
2.
华为手机锁屏时间怎么设置
推荐先把上⾯两个⽂章看完,会对后⾯的阅读有帮助。
消息队列
消息队列有很多种:ActiveMQ,RabbitMQ,ZeroMQ,Kafka,RocketMQ等。本⽂不再细述AMQ是什么、如何搭建使⽤,初学者可先学习AMQ原理和⽤法。
翻了很多帖⼦很少有⽤AMQ实现梯度通知的,所以特此写⼀个。
应⽤场景
⼀个的交易系统,当⽤户消费成功后,需要发送通知,告知这笔订单的商户,该订单已被⽀付了。这种业务场景与微信和⽀付宝的⽀付成功回调通知雷同。
代码实现
业务代码:
代码⽤了lombok,所以对 @Data 这个注解陌⽣的同学先导⼊相应的jar包才能使⽤。以下是⼏个简单的POJO类。
/**
* @Author: Gavin
* @DateTime: 2018/8/14
* @Des: 消费的请求类,⼀些敏感字段我就不展⽰了
*/
@Data
public class SaleReq extends BaReq {
//商户号
private String merchId;
//⾦额
private String totallAmt;
//卡号
private String ecardId;
//订单号
private String orderId;
//回调通知地址
private String notifyUrl;
private String appId;
//订单的附属数据,可以理解为备注remark
private String attach;
//⽀付通知发送次数
private int notifyTime;
}
@Data
public class Notify4ConsumReq implements Serializable {
private String appId;
private String timestamp;
private String orderId;
private String amount;
private String ecardId;
private String storeId;
private String attach;
private String resultCode;
private String errCode;
private String errorMsg;
private String sign;
}
/**
* @Author: Gavin双眼迷离
* @DateTime: 2018/8/14
* @Des:
*/
@Data
public class Notify4ConsumRsp implements Serializable {
//响应码
private String retCode;
//响应信息
private String retMsg;
}
消费业务执⾏完后判断消费是否成功,成功并且请求带有⽀付成功回调通知url和appid则调⽤回调通知⽅法,产⽣⼀个通知消息。PS:appid是⽤于报⽂验签,防⽌第三⽅商户收到假的⽀付成功回调通知。
@Controller
public class SaleController{
@Autowired
ISaleService mSaleService;
@Autowired
ISaleAsynService saleAsynService;
@RequestMapping(value ="/Sale", method = RequestMethod.POST, produces ={"application/json;chart=UTF-8"})
@ResponBody
public String Sale(@RequestBody CommunicationReq param){
try{
//消费业务,具体代码忽略
博士帽子图片rsp = mSaleService.doSale(procid, req, rsp);
}catch(Exception e){
rsp.tRetCode(Constant.FAIL);
rsp.tRetMsg("系统错误");
return rsp;
}
//⽀付回调,当消费成功 && 有回调地址 && 请求带有appid 则调⽤⽀付回调⽅法(消息队列的⽣产者)
if(Constant.SUCCESS.RetCode())&& StringUtils.NotifyUrl())&& StringUtils.AppId())){ try{
}catch(Exception e){
}
}
return rsp;
}
}
/**
* @Author: Gavin
* @DateTime: 2018/8/14
* @Des: 消费的异步处理业务
*/
@Service("saleAsynService")
public class SaleAsynServiceImpl implements ISaleAsynService {
@Autowired
ProducerService producerService;
@Autowired
@Qualifier("adapterQueue")
Destination adapterQueue;
@Autowired
IAppInfoDao appInfoDao;
@Override
public void notify4Produce(String procid, SaleReq saleReq){
//产⽣消息
log.info("发送⽀付通知内容:"+ JsonUtil.Object2Json(saleReq));
/
/初始发送次数为第⼀次这是⼀个关键点
saleReq.tNotifyTime(1);
producerService.ndMessage(adapterQueue, saleReq,0);
log.info("发送⽀付通知成功");
}
房屋转租协议@Transactional(readOnly =true)
拜老爷@Override
剪纸娃娃
public Notify4ConsumRsp notify4Consum(String procid, SaleReq saleReq){
Notify4ConsumRsp rsp =new Notify4ConsumRsp();
rsp.tRetCode("success");
//查找appid
/
/找不到appid,⽆法做签名,所以直接不发通知。
//找不到appid,⽆法做签名,所以直接不发通知。
if(StringUtils.AppId())){
log.info("⽆appid1,不发送通知");
return rsp;
王昭君出塞}
AppInfo appInfo = (AppId());
if(appInfo==null){
log.info("⽆appid2,不发送通知");
return rsp;
}
/
/拼装通知请求对象
Notify4ConsumReq req =new Notify4ConsumReq();
req.AppId());
req.TotallAmt());
req.Attach());
req.EcardId());
req.OrderId());
req.tResultCode("success");
req.MerchId());
//时间戳是为了签名的不确定性,此字段可以换成⼀个随机字符串
req.tTimestamp(DateUtil.formatDateToString(new Date(),"yyyyMMddHHmmsss"));
/
/⽣成签名
String sign="";
try{
sign = ateSignature(SignUtil.beanToMap(req),AppSecret(),SignType.SignType()));
}catch(Exception e){
log.info("签名失败="+Trace(e));
}
req.tSign(sign);
//向第三⽅⽀付系统发送校验请求
String str=null;
NotifyUrl().contains("http")){
str = HttpClientUtil.NotifyUrl(), JsonUtil.Object2Json(req),10*1000);
}el{
str = HttpClientUtil.NotifyUrl(), JsonUtil.Object2Json(req), Constant.TIME_OUT);
}肝大
if(str == null){
log.info("请求⽀付通知系统失败,数据返回为空");
rsp.tRetCode("fail");
return rsp;
}
log.info("⽀付通知系统返回结果:"+ str);
rsp =(Notify4ConsumRsp) JsonUtil.json2Object(str,Notify4ConsumRsp.class);
return rsp;
}
}
上⾯的代码都是和业务有关,以下是AMQ的代码实现了。
/**
* @Author: Gavin
* @DateTime: 2018/8/14
* @Des: 消息⽣产者
*/
@Component
public class ProducerServiceImpl implements ProducerService {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
@Qualifier("responQueue")
private Destination responDestination;
@Override
public void ndMessage(String procid,Destination destination,final String message,final int delayTime){ log.info("---------------⽣产者开始发送消息-----------------");
log.info("消息内容:"+ message);
jmsTemplate.nd(destination,new MessageCreator(){
@Override
public Message createMessage(Session ssion)throws JMSException {
TextMessage textMessage = ateTextMessage(message);
textMessage.tLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delayTime);
return textMessage;
}
});
log.info("---------------⽣产者发送消息完毕-----------------");
}
@Override
public void ndMessage(final Destination destination,final Serializable obj,final long delayTime){
@Override
public Message postProcessMessage(Message message)throws JMSException {
message.tLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delayTime);
return message;
}
});
}
}
/**
* @Author: Gavin
* @DateTime: 2018/8/14
* @Des: 消息消费者,监听接收消息
*/
public class ConsumerMessageListener implements SessionAwareMessageListener {
@Autowired
@Qualifier("myMessageConverter")
private MessageConverter messageConverter;
@Autowired
@Qualifier("adapterQueue")
private Destination adapterQueue;
@Autowired
ISaleAsynService saleAsynService;
@Autowired
ProducerService producerService;
private static final Map<Integer, Long> myMap;
//初始话⼀个map,这个map的key代表通知重发次数,value是通知间隔时间
//⽐如,第⼀次重发间隔5秒,第⼆次重发间隔10秒以此类推
//通知频率为5/10/30/180/1800,单位:秒
static{
myMap =new HashMap<Integer, Long>();
myMap.put(1,5*1000L);
myMap.put(2,10*1000L);