Python常⽤数据库封装⽅法
Python连接db——连接池封装⽅法
mysql,mongo,redis 连接池⽅法
import pymysql
import redis
from aliyun.log import LogClient
from dbutils.persistent_db import PersistentDB
from dbutils.pooled_db import PooledDB
from pymongo import MongoClient
def connect_mongo_by_uri(uri):
"""
:
uri:
example: "mongodb://[urname[:password]@]host:port"
"""
return MongoClient(uri)
def lect_collection(client: MongoClient, dateba_name:str, collection_name:str):
if not isinstance(client, MongoClient):
rai TypeError("client is not a MongoClient instance")
if not(dateba_name and collection_name):
rai Exception("dateba_name or collection_name is empty")
return client[dateba_name][collection_name]
def get_log_client(endpoint, access_id, access_key):
return LogClient(endpoint, access_id, access_key)
def connect_mysql_by_param(host:str='localhost', port:int=3306,
ur:str='root', passwd:str='root', db:str='', chart:str="utf8"):
"""
连接mysql
:param host:
:param port:
:param ur:
:param passwd:
:param db:
:param chart:
:
return: conn
"""
t(host=host, port=port, ur=ur, passwd=passwd, db=db, chart=chart)
def connect_redis_by_uri(host:str='localhost', port:int=6379,
passwd:str='root', max_connections:int=1024, db:int=0,**kwargs):
"""
:param max_connections:
:param host:
:param port:
:param passwd:
:param db:
:
param kwargs: 其余参数可补充
:return:
"""
pool = redis.ConnectionPool(host=host, port=port, password=passwd, max_connections=max_connections, db=db,**kwargs) conn = redis.Redis(connection_pool=pool)
return conn
def connect_mysql_pool_by_param(configs):
'''
创建数据库连接池
:param configs: mysql 单独连接,写⼊host,db,等主要参数
:param is_mult_thread: 传⼊Bool值,True:多线程连接 Fal:单线程连接 :return:
'''
poolDB = PooledDB(
# 指定数据库连接驱动
creator=pymysql,
# 连接池允许的最⼤连接数,0和None表⽰没有限制
maxconnections=5,
# 初始化时,连接池⾄少创建的空闲连接,0表⽰不创建
mincached=2,
# 连接池中空闲的最多连接数,0和None表⽰没有限制
maxcached=2,
# 连接池中最多共享的连接数量,0和None表⽰全部共享(其实没什么卵⽤) maxshared=3,
# 连接池中如果没有可⽤共享连接后,是否阻塞等待,True表⽰等等,
# Fal表⽰不等待然后报错
blocking=True,
# 开始会话前执⾏的命令列表
tssion=[],
# ping Mysql服务器检查服务是否可⽤
ping=0,
**configs
)
return poolDB
字符串相似度匹配
import difflib
def string_similar(s1, s2):
"""
字符串,相似度匹配,
:param s1:
:param s2:
:return:
"""
return difflib.SequenceMatcher(None, s1, s2).quick_ratio()
⽇志打印
#!/usr/bin/python
# encoding=utf-8
import os
import logging
import platform
import time
from multiprocessing import current_process
from logging.handlers import RotatingFileHandler
PROJECT_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
def get_info_logger(log_name):
logger = Logger()
handler = RotatingFileHandler(
f"{PROJECT_PATH}/logs/{log_name}_{int(time.time())}.log", maxBytes=20*1024*1024, backupCount=5,
encoding="utf-8")
handler.tFormatter(logging.Formatter(
fmt="[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"))
handler.tLevel(logging.INFO)
logger.tLevel(logging.INFO)
logger.addHandler(handler)
if not judge_system():# 默认linux运⾏,不在终端输出⽇志
logger.addHandler(logging.StreamHandler())
return logger
def judge_system():
sys = platform.system()
if sys =="Windows":
return Fal
elif sys =="Linux":
return True
el:
return Fal