rocketmq技术内幕pdf_基于RocketMQ分布式事务-完整⽰例
前⾔
之前我们说到,分布式事务是⼀个复杂的技术问题。没有通⽤的解决⽅案,也缺乏简单⾼效的⼿段。
不过,如果我们的系统不追求强⼀致性,那么最常⽤的还是最终⼀致性⽅案。今天,我们就基于 RocketMQ来实现消息最终⼀致性⽅案的分布式事务。
本⽂代码不只是简单的demo,考虑到⼀些异常情况、幂等性消费和死信队列等情况,尽量向可靠业务场景靠拢。
另外,在最后还有《RocketMQ技术内幕》⼀书中,关于分布式事务⽰例代码的错误流程分析,所以篇幅较长,希望⼤家耐⼼观看。
分布式技术专题:Nginx+ZooKeeper+ActiveMQ+Kafka+RabbitMQ+memcached+MongoDB+Redis等s himo.im
⼀、事务消息
在这⾥,笔者不想使⽤⼤量的⽂字赘述 RocketMQ事务消息的原理,我们只需要搞明⽩两个概念。
Half Message,半消息
暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在⼀个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。
曰仁义
当 Producer端对它⼆次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。
事务状态回查
我们想,可能会因为⽹络原因、应⽤问题等,导致Producer端⼀直没有对这个半消息进⾏确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。
当然,什么时候去扫描,包含扫描⼏次,我们都可以配置,在后⽂我们再细说。
简⽽⾔之,RocketMQ事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。
在本⽂,我们的代码就以 订单服务、积分服务 为例。结合上⽂来看,整体流程如下:
⼆、订单服务
在订单服务中,我们接收前端的请求创建订单,保存相关数据到本地数据库。
1、事务⽇志表
在订单服务中,除了有⼀张订单表之外,还需要⼀个事务⽇志表。 它的定义如下:
CREATE TABLE `transaction_log` (
`id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '事务ID',
`business` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '对应业务表中的主键',
小企鹅简笔画PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
这张表专门作⽤于事务状态回查。当提交业务数据时,此表也插⼊⼀条数据,它们共处⼀个本地事务中。通过事务ID查询该表,如果返回记录,则证明本地事务已提交;如果未返回记录,则本地事务可能是未知状态或者是回滚状态。
2、TransactionMQProducer
我们知道,通过 RocketMQ发送消息,需先创建⼀个消息发送者。值得注意的是,如果发送事务消息,在这⾥我们的创建的实例必须是TransactionMQProducer。
public class TransactionProducer {
private String producerGroup = "order_trans_group";
private TransactionMQProducer producer;
//⽤于执⾏本地事务和事务状态回查的监听器
@Autowired
OrderTransactionListener orderTransactionListener;
//执⾏任务的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(producerGroup);
producer.tNamesrvAddr("127.0.0.1:9876");
producer.tSendMsgTimeout(Integer.MAX_VALUE);
producer.tExecutorService(executor);
producer.tTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
//事务消息发送
public TransactionSendResult nd(String data, String topic) throws MQClientException {
Message message = new Message(Bytes());
return this.producer.ndMessageInTransaction(message, null);
}
}
上⾯的代码中,主要就是创建事务消息的发送者。在这⾥,我们重点关注 OrderTransactionListener,它负责执⾏本地事务和事务状态回查。
3、OrderTransactionListener
public class OrderTransactionListener implements TransactionListener {
@Autowired
OrderService orderService;
@Autowired
TransactionLogService transactionLogService;
Logger logger = Class());
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("开始执⾏本地事务....");
LocalTransactionState state;
try{
String body = new Body());
OrderDTO order = JSONObject.parObject(body, OrderDTO.class);
state = LocalTransactionStateMIT_MESSAGE;
饮料大全logger.info("本地事务已提交。{}",TransactionId());
}catch (Exception e){
logger.info("执⾏本地事务失败。{}",e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}书包网txt下载
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
logger.info("开始回查本地事务状态。{}",TransactionId());
LocalTransactionState state;
String transactionId = TransactionId();
if ((transactionId)>0){
state = LocalTransactionStateMIT_MESSAGE;
贾卡
}el {
state = LocalTransactionState.UNKNOW;
}
logger.info("结束本地事务状态查询:{}",state);
return state;
}
}
在通过 producer.ndMessageInTransaction发送事务消息后,如果消息发送成功,就会调⽤到这⾥的executeLocalTransaction⽅法,来执⾏本地事务。在这⾥,它会完成订单数据和事务⽇志的插⼊。
该⽅法返回值 LocalTransactionState 代表本地事务状态,它是⼀个枚举类。
public enum LocalTransactionState {
//提交事务消息,消费者可以看到此消息
COMMIT_MESSAGE,
//回滚事务消息,消费者不会看到此消息
社保断交一个月有什么影响ROLLBACK_MESSAGE,
//事务未知状态,需要调⽤事务状态回查,确定此消息是提交还是回滚
UNKNOW;
}
那么, checkLocalTransaction ⽅法就是⽤于事务状态查询。在这⾥,我们通过事务ID查询transaction_log这张表,如果可以查询到结果,就提交事务消息;如果没有查询到,就返回未知状态。
注意,这⾥还涉及到另外⼀个问题。如果是返回未知状态,RocketMQ Broker服务器会以1分钟的间隔时间不断回查,直⾄达到事务回查最⼤检测数,如果超过这个数字还未查询到事务状态,则回滚此消息。
当然,事务回查的频率和最⼤次数,我们都可以配置。在 Broker 端,可以通过这样来配置它:
brokerConfig.tTransactionCheckInterval(10000); //回查频率10秒⼀次
brokerConfig.tTransactionCheckMax(3); //最⼤检测次数为3
4、业务实现类
初中数学辅导@Service
public class OrderServiceImpl implements OrderService {
@Autowired
OrderMapper orderMapper;
@Autowired
TransactionLogMapper transactionLogMapper;
@Autowired
TransactionProducer producer;
Snowflake snowflake = new Snowflake(1,1);
Logger logger = Class());
//执⾏本地事务时调⽤,将订单数据和事务⽇志写⼊本地数据库
@Transactional
@Override
public void createOrder(OrderDTO orderDTO,String transactionId){
//1.创建订单
Order order = new Order();
//2.写⼊事务⽇志
TransactionLog log = new TransactionLog();
log.tId(transactionId);
log.tBusiness("order");
log.tForeignKey(String.Id()));
transactionLogMapper.inrt(log);
logger.info("订单创建完成。{}",orderDTO);
}
//前端调⽤,只⽤于向RocketMQ发送事务消息
@Override
public void createOrder(OrderDTO order) throws MQClientException {
order.Id());
order.IdStr());
producer.JSONString(order),"order");
}
}
在订单业务服务类中,我们有两个⽅法。⼀个⽤于向RocketMQ发送事务消息,⼀个⽤于真正的业务数据落库。
⾄于为什么这样做,其实有⼀些原因的,我们后⾯再说。
5、调⽤考驾照色盲测试图