python使⽤dbutils的PooledDB连接池,操作数据库
1、使⽤dbutils的PooledDB连接池,操作数据库。
这样就不需要每次执⾏sql后都关闭数据库连接,频繁的创建连接,消耗时间
2、如果是使⽤⼀个连接⼀直不关闭,多线程下,插⼊超长字符串到数据库,运⾏⼀段时间后很容易出现OperationalError: (2006, ‘MySQL rver has gone away’)这个错误。
使⽤PooledDB解决。
# coding=utf-8
"""
使⽤DBUtils数据库连接池中的连接,操作数据库
OperationalError: (2006, ‘MySQL rver has gone away’)
"""
import json
import pymysql
import datetime
from DBUtils.PooledDB import PooledDB
import pymysql
class MysqlClient(object):
__pool = None;
def__init__(lf, mincached=10, maxcached=20, maxshared=10, maxconnections=200, blocking=True,
maxusage=100, tssion=None, ret=True,
host='127.0.0.1', port=3306, db='test',
ur='root', passwd='123456', chart='utf8mb4'):
"""
:param mincached:连接池中空闲连接的初始数量
:param maxcached:连接池中空闲连接的最⼤数量
:param maxshared:共享连接的最⼤数量
:param maxconnections:创建连接池的最⼤数量
:param blocking:超过最⼤连接数量时候的表现,为True等待连接数量下降,为fal直接报错处理
:param maxusage:单个连接的最⼤重复使⽤次数
:param tssion:optional list of SQL commands that may rve to prepare
the ssion, e.g. ["t datestyle to ...", "t time zone ..."]
:param ret:how connections should be ret when returned to the pool
(Fal or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:
param host:数据库ip地址
:param port:数据库端⼝
:param db:库名
:param ur:⽤户名
:param passwd:密码
:param chart:字符编码
"""
if not lf.__pool:
lf.__class__.__pool = PooledDB(pymysql,
mincached, maxcached,
maxshared, maxconnections, blocking,
maxusage, tssion, ret,
host=host, port=port, db=db,
ur=ur, passwd=passwd,
chart=chart,
cursorclass=pymysql.cursors.DictCursor
)
lf._conn = None
lf._cursor = None
lf.__get_conn()
def__get_conn(lf):
lf._conn = lf.__tion();
lf._cursor = lf._conn.cursor();
def clo(lf):
try:
lf._cursor.clo()
lf._conn.clo()
except Exception as e:
print e
def__execute(lf, sql, param=()):
count = lf._ute(sql, param)
print count
return count
@staticmethod
def__dict_datetime_obj_to_str(result_dict):
"""把字典⾥⾯的datatime对象转成字符串,使json转换不出错"""
if result_dict:
result_replace = {k: v.__str__() for k, v in result_dict.items() if isinstance(v, datetime.datetime)}
result_dict.update(result_replace)
return result_dict
def lect_one(lf, sql, param=()):
"""查询单个结果"""
count = lf.__execute(sql, param)
result = lf._cursor.fetchone()
""":type result:dict"""
result = lf.__dict_datetime_obj_to_str(result)
return count, result
def lect_many(lf, sql, param=()):
"""
查询多个结果
:param sql: qsl语句
:param param: sql参数
:return: 结果数量和查询结果集
"""
count = lf.__execute(sql, param)
result = lf._cursor.fetchall()
""":type result:list"""
[lf.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
def execute(lf, sql, param=()):
count = lf.__execute(sql, param)
return count
def begin(lf):
"""开启事务"""
lf._conn.autocommit(0)
def end(lf, option='commit'):
"""结束事务"""
if option == 'commit':
lf._conn.autocommit()
el:
lf._llback()
if__name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM shiji WHERE id = 1'
result1 = mc.lect_one(sql1)
print json.dumps(result1[1], ensure_ascii=Fal)
sql2 = 'SELECT * FROM shiji WHERE id IN (%s,%s,%s)'
param = (2, 3, 4)
print json.dumps(mc.lect_many(sql2, param)[1], ensure_ascii=Fal)
如果独⽴使⽤pymysql数据库,最好是配合DButils库。
上⾯的⽤法中是⼀直使⽤了mc这个对象,真实意图不是这样的,你可以随意实例化MysqlClient(),⽤不同的MysqlClient实例操纵数据库。