pythonThreadPoolExecutor多线程踩坑之旅
责备的英文
⼀、背景
最近做了套接⼝测试系统,后台定时任务会批量去跑测试⽤例,⽤的是多线程,以前多线程模块⽤的是,但因为⼀些原因近期替换成了python3标准库中的ThreadPoolExecutor,于是问题开始了……
项⽬⽤的是:Flask + Flask-SQLAlchemy + Flask-APScheduler + ThreadPoolExecutor
替换原因,⼀些未知原因,任务运⾏完成后,个别线程⽆法正常退出,导致线程泄漏
⼆、问题描述
定时任务运⾏时⼤约有20%的概率会出现问题⼀与问题⼆
问题⼀
⼤致意思是当前数据库连接发了⼀个序号的45的包,但收到的包序号不是45,错乱了。⽹上查找了各种资料,⼤致都说是多线程引起,即多个线程共⽤了同⼀个数据库连接解决⽅案有如下:
原因:
使⽤了多线程,多线程共享了同⼀个数据库连接,但每个execute前没有加上互斥锁
⽅法:
⽅法⼀:每个execute前加上互斥锁
lock.acquire()
⽅法⼆:
请假模版 每个线程拥有⾃⼰的数据库连接,即在线程调⽤函数中加上数据库连接代码
⽅法三:
所有线程共⽤⼀个连接池,需要考虑线程总数和连接池连接数上限的问题
尝试了各种⽅法,还把每个线程db.ssion中数据库连接对象的内存地址打印出来了,确定每个线程都是唯⼀的,没有共⽤。
db.ssion是个全局对象,所有线程引⽤的都是同⼀个对象,不是每个线程⼀个。但是数据库连接只是db.ssion对象中的⼀个属性,这个属性存储在istry中,registry是werkzeug.local.Local()类型的对象为key-value存储结构,key为当前线程ID,value为每个线程独⽴的值。每个线程⼀个连接,互不影响互不⼲扰。可以通过如下语句获取当前线程的数据库连接对象:(_ident()))
问题⼆
(InvalidRequestError) Can’t reconnect until invalid transaction is rolled back
⼤致的意思是在同⼀个数据库连接中,在发起新的数据库请求前,必须先关闭掉之前报错的事务,不要认为⾃⼰没有开启事务,采⽤Flask-SQLAlchemy时,即使是最简单的lect语句,也会⾃动开启事务,查询完成时会⾃动回滚。如下⽇志:
2019-08-10 12:42:18,312 ine.ba.Engine BEGIN (implicit)
2019-08-10 12:42:18,312 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- BEGIN (im
plicit)
2019-08-10 12:42:18,319 ine.ba.Engine SELECT ur.status AS ur_status, ur.id AS ur_id, ur.urname AS ur_urname, ur.name AS ur_name, ur.phone AS ur_phone, ur.email AS ur_email, ur.address AS ur_address, ur.remark AS ur_remark, ur.create_time AS ur_create_time
FROM ur
WHERE ur.status = %(status_1)s
2019-08-10 12:42:18,319 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- SELECT ur.status AS ur_status, ur.id AS ur_id, ur.urname AS ur_urname, ur.name AS ur_name, ur.phone AS
ur_phone, ur.email AS ur_email, ur.address AS ur_address, ur.remark AS ur_remark, ur.create_time AS ur_create_time
FROM ur
WHERE ur.status = %(status_1)s
2019-08-10 12:42:18,319 ine.ba.Engine {‘status_1’: 1}
2019-08-10 12:42:18,319 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- {‘status_1’: 1}
2019-08-10 12:42:18,329 ine.ba.Engine ROLLBACK
2019-08-10 12:42:18,329 - Thread-1 - 123145310482432 - log.py - info - 110 -【INFO】- ROLLBACK
问题三
默认情况下mysql服务器端最⼤允许154个连接,此错误即是客户端发起的连接数太多,已经超过了数据库服务端的最⼤限制,从⽽导致的报错。但是ThreadPoolExecutor只开了20个线程,加上个别的访问请求,肯定不多超过154的限制,但定时任务运⾏时查看Mysql的连接(show processlist)时却⼀直在涨,直到超过限制报错。
三、问题分析
问题⼀分析
前⾯的描述中已经排除了多线程间互相的⼲扰,再对⽐下以前⾃⼰封装的多线程包,发现⾃⼰封装的多线程包,在定时任务每次运⾏时都会重新起20个线程,但ThreadPoolExecutor总计起20个线程,后续每次调度时任务都是在同⼀批线程中运⾏,那存在的可能就是在同⼀个线程内,上⼀次运⾏时的状态影响了本次,从⽽导致底层的pymysql在操作数据库时收发序号错乱。只要在任务开始时,确保数据库连接是初始的,不存在遗留事务即可解决此问题。
变相是什么意思问题⼆分析
在⼀些可能出错的地⽅,代码都已经加上了异常处理,但总会有漏⽹之鱼,整个⽤例运⾏期间,因各种原因偶尔会有错误出现,再加上此问题与问题⼀互相⼲扰纠缠在⼀起,每次出现也都是同时出现。
问题三分析
运⾏定时任务时,随着运⾏⽤例数的增加,数据库连接也⼀直在增加,由此判断必定是连接没有复⽤,每次都新建了连接导致的,虽然代码中确实是采⽤了连接池,但感觉没起作⽤。使⽤以下语句,打印出线程中的连接池信息,发现每运⾏⼀个任务,都新建了⼀个连接池,池中只有⼀个连接,确实是没有达到复⽤的⽬的,要解决的是运⾏不同⽤例时不要创建新的池,只要共⽤原来的池即可。
logging.info(f"连接池内存地址:{ine.pool)}")
logging.info(f"当前ssion地址:{id(db.ssion)}")
logging.info(f"当前数据库连接内存地址:{id((_ident()))}")
四、问题解决
1.问题⼀和问题⼆
问题⼀与问题⼆纠缠在⼀起,⼤致确定属同⼀问题,即线程⾥有未处理异常,导致后续再次运⾏时出错。
原来⽤⾃已封装的多线程包时,因为每次都是起⼀批新线程,即使出现未处理异常也只会影响当前线程的当前⽤例。
1.1 ⾃⼰封装的多线程包
每次起20个线程,当出现未处理异常时,会导致此线程down掉,剩余19线程继续⼲活,⼲完后这次线程全部正常退出,下次时再次新起20个线程,错误不会传递不会累加
1.2 ThreadPoolExecutor
在应⽤启动时初始化⼀次,创建20个线程,运⾏⽤例碰到未处理异常时,线程不会down掉,会继续⼲活,那些打开的未关闭事务会影响后续⽤例运⾏,错误会传递会累加,只要能消除错误传递即可。
executor = ThreadPoolExecutor(max_workers=20)
1.3 解决⽅案
【运⾏逻辑段】中的代码可能会报异常,导致db.ssion.clo()语句没有运⾏,当前线程占⽤的连接没有还给连接池,再运⾏下⼀条⽤例时,会继续使⽤原来的连接,因⼀些遗留事务或遗留数据,导致报错。
def run_test_ca(ca_id):
# 运⾏逻辑
xxxxxxxxxxxxxxxx
# 运⾏逻辑
db.ssion.clo()团员个人年度总结
想要确保获取到的连接是“⼲净”的,只要在【运⾏逻辑段】前⾯再加⼀条语句即可,如下代码。如果线程中已经存在连接时则会先关闭,后续要使⽤时会⾃动申请新连接;当线程中不存在连接时会申请连接并马上关闭,后续要使⽤时⼜会⾃动申请新连接,这样虽然会做⼀些⽆⽤功,但能确保连接“⼲净”。
def run_test_ca(ca_id):
db.ssion.clo()
# 运⾏逻辑
xxxxxxxxxxxxxxxx
# 运⾏逻辑
db.ssion.clo()
1. db.ssion.clo()语句运⾏后,线程会将连接还给连接池,连接池不会断开与数据库的物理连接,只会做⼀些ROLLBACK操作,将⼀些未提交
的数据或未提交的事务回滚,确保下次分配出去时是⼲净的。
2. 代码中使⽤db.ssion.query()或Ur.query()等查询语句或别的操作语句时,线程会⾃动从连接池中申请到新的连接。历史人物简介
问题三
定时任务中要⽤到应⽤模型,必须在启动时推送应⽤上下⽂,否则会报错找不到应⽤的错误,如下代码。把推送上下⽂的语句放在了
run_test_ca函数中,但这样会带来⼀个恶果,即每条⽤例运⾏时都会创建⼀个app对象,每个app对象都有⾃⼰的连接池,导致连接池达不到复⽤的⽬的,越占越多,最终超出数据库连接从⽽报错。
def run_test_ca(ca_id):
努力的事例from .. import create_app
app = create_app()
app.app_context().push()
# 运⾏逻辑
xxxxxxxxxxxxxxxx
# 运⾏逻辑
db.ssion.clo()
原来采⽤⾃⼰封装的多线程包时,因为每次都是新建⼀批线程,等所有⽤例运⾏完成后这些线程会退出,连接⼜会释放,连接只在当次内累加最终没有超出数据库上限,所以没有此错误。
采⽤标准库中的ThreadPoolExecutor后,因为线程在整个应⽤的⽣命周期中会⼀直存在,导致创建的连接都没有释放,最终总会超出数据库限制。解决此问题,只要不在每次运⾏⽤例时都重复推送上下⽂不创建新应⽤与新连接池即可。如下代码,把推送应⽤上下⽂的语句独⽴成⼀个函数,传给initializer参数,则只会在线程创建时初始化⼀次。
# 线程初始化函数
def init_context():
from .. import app
app.app_context().push()
logging.info("推送应⽤上下⽂完成")
# 创建定时任务线程池
executor = ThreadPoolExecutor(
max_workers=20, initializer=init_context
)
# 测试⽤例运⾏函数
def run_test_ca(ca_id):
# 运⾏逻辑
xxxxxxxxxxxxxxxx
# 运⾏逻辑
db.ssion.clo()
# 线程初始化函数2
def init_context2():
from .. import create_app()
app = create_app()
app.app_context().push()
logging.info("推送应⽤上下⽂完成")
注意init_context与init_context2函数的区别,init_context2会创建⼀个新的连接池,不会与应⽤共⽤⼀个。推荐⽤init_context,这样整个应⽤进程中只存在⼀个连接池。
五、总结
碰到问题⼀时先⼊为主了,⼀直在多线程⽅⾯排查,导致浪费了不少时间;
替换多线程实现时考虑不周,简单的替换完成后⼤多情况下也正常,忽略了实现⽅式的差异。在动⼿替换前应该把代码通读⼀遍,先记录哪些可能需要修改的点,不然⼀旦替换成功后,思维容易定势;
六、疑问
按照以上解释虽然确实解决了我遇到的问题,但还有⼀些存在疑问的地⽅,⽐如问题⼀与问题⼆,按照上⾯的解释,问题应该是在所有有数据库操作的地⽅随机出现,但实际上问题总是在查询某⼀张表时出现,且查询这张表没有任何特别之处。这个按照上⾯的描述是解释不通的,可能还有某些没搞明⽩的地⽅,等⼀段时间来再回头看看是否能明悟。
七、回顾
7.1 为什么“问题⼀”与“问题⼆”总是在同⼀个地⽅出现
接⼝系统是在线的Web服务,框架⽤的是Flask,通过接⼝调⽤触发测试⽤例运⾏。
7.1.1 运⾏单个⽤例
调⽤参数:GET:/run_ca?ca_id=1
登记表自我鉴定
此时是通过ca_id获取⽤例对象,然后将⽤例对象传递给测试引擎,完成后将引擎返回结果返回给前端。此时没有⽤到ThreadPoolExecutor对象,整个过程都是在flask的线程中完成的,flask框架会⾃动管理数据库连接,故没有碰到问题。
7.1.2 运⾏多个⽤例
调⽤参数:GET:/run_ca?module_id=1
此时会先查询出模块下所有的测试⽤例对象,然后将测试⽤例集传递给测试引擎,测试引擎为了加速执⾏,会启⽤ThreadPoolExecutor线程,ThreadPoolExecutor线程始终存在,且⾥⾯的数据库连接已经脱离了flask的管理范涛,不会⾃动管理需要⼿⼯管理,⼀旦出现未处理异常,后续再使⽤此线程时则会碰到问题⼀与问题⼆。因为执⾏⽤例的步骤始终⼀致,都是先查询某张表,故错误始终出在同⼀处。——2021.07.23
如何消灭老鼠