前言
最近在使用flask搭建一个网站,其中有用到mysql数据库的地方。
本来想着使用学过的单例模式,创建一个全局的数据库连接,这样每个需要用到数据库连接的地方直接调用这个唯一的数据库链接即可。结果在运行时出现了错误:
Packet sequence number wrong - got 11 expected 1
pymysql.err.InterfaceError: (0, '')
经过在网上查询,发现是因为多线程之间共享了同一个数据库连接,线程之间可以共享模块,但是并不能共享连接。
解决这个问题的方法有三个:
- 线程间共享一个数据库连接,但是要在每个execute操作前加上互斥锁。
- 在每个线程中,都创建一个数据库连接,各用各的。
- 使用DBUtils+连接池的方式,每个线程使用数据库连接时在连接池中取用。
下面简单讲述一下这种解决方法。
方法一、加互斥锁,共享数据库连接
原理:线程间共享一个数据库连接,为了避免线程间竞争使用带来混乱,当一个线程在使用数据连接时,进行加锁处理,该线程使用完毕,就将数据库连接进行解锁,让其他需要使用的线程使用。这样做,可以将并发变成串行,从而保证操作的有序进行。
加锁示例,在每个execute操作前加上互斥锁:
import threading # 需要引入threding模块a
lock = threading.Lock()
...
lock.acquire() # 上锁
cursor.execute(sql)
lock.release() # 解锁
方法二、在每个线程中,都创建一个数据库连接
在每个线程中,都创建一个数据库连接,各个线程间互不干扰,使用完就close。比如说我所用的flask:
#!usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
from flask import Flask
app = Flask(__name__)
@app.route('/index')
def index():
# 创建自己的链接数据库,用完就close()
conn = pymysql.connect(host="127.0.0.1", port=3306, user='root', password='123', database='pooldb', charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from td where id=%s", [5, ])
result = cursor.fetchall() # 获取数据
cursor.close()
conn.close() # 关闭链接
print(result)
return "执行成功"
@app.route('/test')
def test():
# 创建自己的链接数据库,用完就close()
conn = pymysql.connect(host="127.0.0.1", port=3306, user='root', password='123', database='pooldb', charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from td where id=%s", [5, ])
result = cursor.fetchall() # 获取数据
cursor.close()
conn.close() # 关闭链接
print(result)
return "执行成功"
if __name__ == '__main__':
app.run(debug=True)
方法三、DBUtils+连接池
DBUtils安装方法:
pip install DBUtils
DBUtils有两种连接模式:
模式一:基于threaing.local实现为每一个线程创建一个连接,关闭是伪关闭,当前线程可以重复使用。
不过这种方式在线程比较多的情况下,还是会建立很多的数据连接,会影响性能。
代码示例:
import pymysql
from DBUtils.PersistentDB import PersistentDB
POOL = PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable=False,
# 如果为False时,conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host="127.0.0.1",
port=3306,
user="数据库用户名",
password="数据库密码",
database="你的数据库名字",
charset="utf8"
)
def func():
# 获取数据库连接
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute("select * from userinfo")
ret = cursor.fetchall()
print(ret)
cursor.close()
# 伪关闭,其实没有真正的关闭
conn.close()
func()
模式二:创建一批数据库连接到连接池,供所有线程共享使用,使用完毕后再放回到连接池,是一种比较常用的方法。(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)
代码示例:
import pymysql
from DBUtils.PooledDB import PooledDB
db_pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用,如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host="127.0.0.1",
port=3306,
user="数据库用户名",
password="数据库密码",
database="数据库名",
)
def func():
# 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
conn = db_pool.connection()
cursor = conn.cursor()
cursor.execute('select * from userinfo')
result = cursor.fetchall()
print(result)
conn.close()
func()