如何在PEP249兼容Python数据库API中实现可复用连接的多游标类?
实现兼容PEP 249的多连接复用游标管理类
这确实是个很实际的场景——PEP 249作为Python数据库API的基础标准,确实没有规定连接的"忙碌状态"检测方法,它的核心目标是统一不同数据库驱动的基础操作接口,而非连接池、状态管理这类上层功能。不过咱们可以通过封装连接和游标的生命周期,完全基于PEP 249标准实现你的需求,不用依赖驱动的私有API。
核心思路:通过封装跟踪连接状态
既然PEP 249没有提供连接忙碌状态的原生方法,我们可以自己给连接附加状态标记,并通过自定义游标类来自动维护这个状态:
- 给每个管理的连接添加一个
_is_busy属性,标记它是否被某个游标占用 - 封装游标类,在游标被创建时标记连接为忙碌,在游标被关闭/释放时标记连接为可用
完整实现示例
1. 封装管理连接类
首先我们写一个ManagedConnection类,包装原生的PEP 249连接,附加状态管理:
class ManagedConnection: def __init__(self, raw_connection): self.raw_connection = raw_connection self._is_busy = False def is_busy(self): return self._is_busy def mark_busy(self): self._is_busy = True def mark_free(self): self._is_busy = False # 转发所有PEP 249连接的原生方法 def __getattr__(self, name): return getattr(self.raw_connection, name)
2. 封装跟踪状态的游标类
接着写一个TrackedCursor,包装原生游标,自动维护连接的忙碌状态,同时支持上下文管理器(方便自动释放):
class TrackedCursor: def __init__(self, managed_connection, raw_cursor): self.managed_connection = managed_connection self.raw_cursor = raw_cursor def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def close(self): # 关闭原生游标 self.raw_cursor.close() # 标记连接为可用 self.managed_connection.mark_free() # 转发所有PEP 249游标的原生方法 def __getattr__(self, name): return getattr(self.raw_cursor, name)
3. 最终的Database类
现在我们可以实现你的Database类,复用连接并管理状态:
class Database: def __init__(self, connection_factory): # connection_factory是一个创建原生PEP249连接的函数 # 比如lambda: psycopg2.connect(...) 或者 sqlite3.connect(...) self.connection_factory = connection_factory self.connections = [] def _new_connection(self): raw_con = self.connection_factory() return ManagedConnection(raw_con) def get_cursor(self, query): selected_connection = None # 先查找可用连接,同时检查连接是否有效(防止被数据库主动断开) for con in self.connections: if not con.is_busy(): # 可选:检查连接是否存活,不同数据库的检测方式略有不同,但都可通过PEP249方法实现 try: # 比如执行一个空查询或者ping(如果驱动支持,不行就用SELECT 1) temp_cur = con.raw_connection.cursor() temp_cur.execute("SELECT 1") temp_cur.close() selected_connection = con break except: # 连接失效,移除并跳过 self.connections.remove(con) # 无可用连接则新建 if selected_connection is None: selected_connection = self._new_connection() self.connections.append(selected_connection) # 标记连接为忙碌,创建跟踪游标并执行查询 selected_connection.mark_busy() raw_cursor = selected_connection.raw_connection.cursor() raw_cursor.execute(query) return TrackedCursor(selected_connection, raw_cursor)
使用示例
# 假设我们用SQLite作为示例 import sqlite3 # 创建Database实例,传入连接工厂函数 db = Database(lambda: sqlite3.connect(":memory:")) # 获取两个并行游标 with db.get_cursor("CREATE TABLE test (id INTEGER)") as cur1: with db.get_cursor("INSERT INTO test VALUES (1)") as cur2: # 此时第二个游标会新建连接,因为第一个连接处于忙碌状态 pass # 再次获取游标,会复用之前的可用连接 with db.get_cursor("SELECT * FROM test") as cur3: print(cur3.fetchall()) # 输出 [(1,)]
关键注意事项
- 必须正确关闭游标:不管是手动调用
close()还是使用with上下文管理器,否则连接会一直处于忙碌状态无法复用。上面的TrackedCursor实现了上下文管理器,推荐用这种方式。 - 连接有效性检查:数据库可能会自动断开长时间闲置的连接,所以在复用连接前最好做有效性检测,示例中用了
SELECT 1的方式,这是PEP249兼容的通用方法。 - 连接池大小限制:你可以给
Database类添加最大连接数的限制,避免无限制创建连接导致数据库负载过高。
内容的提问来源于stack exchange,提问作者ttk203




