You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在PEP249兼容Python数据库API中实现可复用连接的多游标类?

实现兼容PEP 249的多连接复用游标管理类

这确实是个很实际的场景——PEP 249作为Python数据库API的基础标准,确实没有规定连接的"忙碌状态"检测方法,它的核心目标是统一不同数据库驱动的基础操作接口,而非连接池、状态管理这类上层功能。不过咱们可以通过封装连接和游标的生命周期,完全基于PEP 249标准实现你的需求,不用依赖驱动的私有API。

核心思路:通过封装跟踪连接状态

既然PEP 249没有提供连接忙碌状态的原生方法,我们可以自己给连接附加状态标记,并通过自定义游标类来自动维护这个状态:

  • 给每个管理的连接添加一个_is_busy属性,标记它是否被某个游标占用
  • 封装游标类,在游标被创建时标记连接为忙碌,在游标被关闭/释放时标记连接为可用

完整实现示例

1. 封装管理连接类

首先我们写一个ManagedConnection类,包装原生的PEP 249连接,附加状态管理:

class ManagedConnection:
    def __init__(self, raw_connection):
        self.raw_connection = raw_connection
        self._is_busy = False

    def is_busy(self):
        return self._is_busy

    def mark_busy(self):
        self._is_busy = True

    def mark_free(self):
        self._is_busy = False

    # 转发所有PEP 249连接的原生方法
    def __getattr__(self, name):
        return getattr(self.raw_connection, name)

2. 封装跟踪状态的游标类

接着写一个TrackedCursor,包装原生游标,自动维护连接的忙碌状态,同时支持上下文管理器(方便自动释放):

class TrackedCursor:
    def __init__(self, managed_connection, raw_cursor):
        self.managed_connection = managed_connection
        self.raw_cursor = raw_cursor

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    def close(self):
        # 关闭原生游标
        self.raw_cursor.close()
        # 标记连接为可用
        self.managed_connection.mark_free()

    # 转发所有PEP 249游标的原生方法
    def __getattr__(self, name):
        return getattr(self.raw_cursor, name)

3. 最终的Database类

现在我们可以实现你的Database类,复用连接并管理状态:

class Database:
    def __init__(self, connection_factory):
        # connection_factory是一个创建原生PEP249连接的函数
        # 比如lambda: psycopg2.connect(...) 或者 sqlite3.connect(...)
        self.connection_factory = connection_factory
        self.connections = []

    def _new_connection(self):
        raw_con = self.connection_factory()
        return ManagedConnection(raw_con)

    def get_cursor(self, query):
        selected_connection = None

        # 先查找可用连接,同时检查连接是否有效(防止被数据库主动断开)
        for con in self.connections:
            if not con.is_busy():
                # 可选:检查连接是否存活,不同数据库的检测方式略有不同,但都可通过PEP249方法实现
                try:
                    # 比如执行一个空查询或者ping(如果驱动支持,不行就用SELECT 1)
                    temp_cur = con.raw_connection.cursor()
                    temp_cur.execute("SELECT 1")
                    temp_cur.close()
                    selected_connection = con
                    break
                except:
                    # 连接失效,移除并跳过
                    self.connections.remove(con)

        # 无可用连接则新建
        if selected_connection is None:
            selected_connection = self._new_connection()
            self.connections.append(selected_connection)

        # 标记连接为忙碌,创建跟踪游标并执行查询
        selected_connection.mark_busy()
        raw_cursor = selected_connection.raw_connection.cursor()
        raw_cursor.execute(query)
        return TrackedCursor(selected_connection, raw_cursor)

使用示例

# 假设我们用SQLite作为示例
import sqlite3

# 创建Database实例,传入连接工厂函数
db = Database(lambda: sqlite3.connect(":memory:"))

# 获取两个并行游标
with db.get_cursor("CREATE TABLE test (id INTEGER)") as cur1:
    with db.get_cursor("INSERT INTO test VALUES (1)") as cur2:
        # 此时第二个游标会新建连接,因为第一个连接处于忙碌状态
        pass

# 再次获取游标,会复用之前的可用连接
with db.get_cursor("SELECT * FROM test") as cur3:
    print(cur3.fetchall())  # 输出 [(1,)]

关键注意事项

  • 必须正确关闭游标:不管是手动调用close()还是使用with上下文管理器,否则连接会一直处于忙碌状态无法复用。上面的TrackedCursor实现了上下文管理器,推荐用这种方式。
  • 连接有效性检查:数据库可能会自动断开长时间闲置的连接,所以在复用连接前最好做有效性检测,示例中用了SELECT 1的方式,这是PEP249兼容的通用方法。
  • 连接池大小限制:你可以给Database类添加最大连接数的限制,避免无限制创建连接导致数据库负载过高。

内容的提问来源于stack exchange,提问作者ttk203

火山引擎 最新活动