TransactionSynchronizationManager用法和含义

更新时间:2023-05-05 18:27:02 阅读: 评论:0

TransactionSynchronizationManager⽤法和含义
当我们有业务需要在事务提交过后进⾏某⼀项或者某⼀系列的业务操作时候我们就可以使⽤TransactionSynchronizationManager
通过spring的aop机制将需要进⾏后置业务处理的操作,提交给spring的处理机制,并且切⼊到事务处理的后⾯
TransactionSynchronizationManager这个类中由⼀系列的ThreadLocal ,我们需要关注的是synchronizations,在后⾯使⽤到的TransactionSynchronizationManager.isSynchronizationActive()、
在Spring在开启数据库事务(⽆论是使⽤@Transactional注解,还是⽤xml配置)时,都会向其中写⼊⼀个实例,⽤于⾃动处理Connection的获取、提交或回滚等操作。
再看isSynchronizationActive()⽅法,判断了synchronizations中是否有数据(Set<TransactionSynchronization>⾮null即可,并不要求其中有TransactionSynchronization实例。
再看registerSynchronization()⽅法,⾸先调⽤isSynchronizationActive()做⼀个校验;然后将⼊参synchronization添加到synchronizations 中。⼊参synchronization中的⽅法不会在这⾥执⾏,⽽是要等到事务执⾏到特定阶段时才会被调⽤。
TransactionSynchronizationAdapter是⼀个适配器:它实现了TransactionSynchronization接⼝,并为每⼀个接⼝⽅法提供了⼀个空的实现。这类适配器的基本思想是:接⼝中定义了很多⽅法,然⽽业务代码往往只需要实现其中⼀⼩部分。利⽤这种“空实现”适配器,我们可以专注于业务上需要处理的回调⽅法,⽽不⽤在业务类中放⼤量⽽且重复的空⽅法。
结合TransactionSynchronizationManager和TransactionSynchronizationAdapter利⽤ThreadPoolExecutor实现⼀个事务后多线程处理功能。
package com.*.module.spring.support;
le.urrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ansaction.support.TransactionSynchronizationAdapter;
import ansaction.support.TransactionSynchronizationManager;
import java.util.ArrayList;
import java.util.List;
import urrent.*;
/**
* 事务提交异步线程
*
* @author ly
*/
public class TransactionAfterCommitExecutor extends ThreadPoolExecutor {
private static final Logger LOGGER = Logger(TransactionAfterCommitExecutor.class);
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public TransactionAfterCommitExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private ThreadLocal<List<Runnable>> currentRunables = new ThreadLocal<List<Runnable>>(){
@Override
protected List<Runnable> initialValue() {
return new ArrayList<>(5);
}
};
private ThreadLocal<Boolean> registed = new ThreadLocal<Boolean>(){
@Override
protected Boolean initialValue() {
return fal;
}
};
/**
* 默认策略丢弃最⽼的数据
*/
public TransactionAfterCommitExecutor() {
this(
50, 500,
500L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024),
new ThreadFactoryBuilder().tNameFormat("transaction-after-commit-call-pool-%d").build(),
new ThreadPoolExecutor.DiscardOldestPolicy());
}
@Override
public void execute(final Runnable runnable) {
//如果事务同步未启⽤则认为事务已经提交,马上进⾏异步处理
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
} el {
//同⼀个事务的合并到⼀起处理
<().add(runnable);
//如果存在事务则在事务结束后异步处理
if(!()){
registed.t(true);
}
}
}
@Override
public Future<?> submit(final Runnable runnable) {
//如果事务同步未启⽤则认为事务已经提交,马上进⾏异步处理
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
return super.submit(runnable);
} el {
final RunnableFuture<Void> ftask = newTaskFor(runnable, null);
//如果存在事务则在事务结束后异步处理
public void afterCommit() {
TransactionAfterCommitExecutor.super.submit(ftask);
}
});
return ftask;
}
}
private class AfterCommitTransactionSynchronizationAdapter extends TransactionSynchronizationAdapter{        @Override
public void afterCompletion(int status) {
final List<Runnable> txRunables = new ArrayList<>(());
if(status == STATUS_COMMITTED){
ute(new Runnable() {
@Override
public void run() {
for (Runnable runnable : txRunables) {
try {
runnable.run();
} catch (Exception e) {
<("ex:",e);
}
}
}
});
}
}
}
}

本文发布于:2023-05-05 18:27:02,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/533934.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:事务   处理   需要   提交   业务   实现
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图