You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

FastAPI中LangGraph(psycopg)与SQLAlchemy(asyncpg)共用PostgreSQL时频繁出现连接异常问题求助

FastAPI中LangGraph(psycopg)与SQLAlchemy(asyncpg)共用PostgreSQL时频繁出现连接异常问题求助

问题背景

大家好,我在维护一个稳定运行的FastAPI项目,核心用SQLAlchemy ORM(基于asyncpg)做数据持久化,最近集成LangGraph实现Agent的状态和 checkpoint 持久化,用的是LangGraph的AsyncPostgresStoreAsyncPostgresSaver(基于psycopg),两者连接同一个PostgreSQL数据库。项目里的ORM仓库类也会在LangGraph的工具和Agent逻辑中调用。

但上线后发现,在并发请求或者长时请求场景下,频繁出现数据库连接异常,而且错误大多集中在两者同时关闭连接或回滚会话的时候。

频繁触发的错误信息

  • 核心初始错误:
    psycopg.OperationalError: consuming input failed: server closed the connection unexpectedly
    
  • 后续连锁错误:
    • asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
    • sqlalchemy.exc.DBAPIError: connection was closed in the middle of operation
    • dishka.exceptions.ExitError: Cleanup context errors

当前代码配置

from collections.abc import AsyncIterator
from typing import cast
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.store.postgres.aio import AsyncPostgresStore
from psycopg import AsyncConnection
from psycopg.rows import DictRow
from psycopg_pool import AsyncConnectionPool
from sqlalchemy.ext.asyncio import (
    AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine,
)

async def get_async_engine(
    dsn: PostgresDsn,
) -> AsyncIterator[AsyncEngine]:
    async_engine = create_async_engine(
        url=dsn,
        pool_size=5,
        max_overflow=10,
        pool_pre_ping=True,
    )
    yield async_engine
    await async_engine.dispose()

def get_async_session_factory(
    engine: AsyncEngine,
) -> async_sessionmaker[AsyncSession]:
    return async_sessionmaker(
        bind=engine,
        class_=AsyncSession,
        autoflush=False,
        expire_on_commit=False,
    )

async def get_main_async_session(
    async_session_factory: async_sessionmaker[AsyncSession],
) -> AsyncIterator[MainAsyncSession]:
    async with async_session_factory() as session:
        yield cast(MainAsyncSession, session)

async def get_connection_pool(
    settings: AppSettings,
) -> AsyncIterator[AsyncConnectionPool[AsyncConnection[DictRow]]]:
    pool: AsyncConnectionPool[AsyncConnection[DictRow]] = AsyncConnectionPool(conninfo=settings.postgres.psycopg_dsn)
    await pool.open()
    try:
        yield pool
    finally:
        await pool.close()

async def get_store(
    connection_pool: AsyncConnectionPool[AsyncConnection[DictRow]],
) -> AsyncIterator[AsyncPostgresStore]:
    store = AsyncPostgresStore(connection_pool)
    yield store

async def get_checkpointer(
    connection_pool: AsyncConnectionPool[AsyncConnection[DictRow]],
) -> AsyncIterator[AsyncPostgresSaver]:
    saver = AsyncPostgresSaver(connection_pool)
    yield saver

# Dishka 依赖注入配置
# SQLAlchemy 持久化
provider.provide(source=get_async_engine, scope=Scope.APP)
provider.provide(source=get_async_session_factory, scope=Scope.APP)
provider.provide(source=get_main_async_session, scope=Scope.REQUEST)

# LangGraph 持久化
provider.provide(source=get_connection_pool, scope=Scope.APP)
provider.provide(source=get_store, scope=Scope.REQUEST)
provider.provide(source=get_checkpointer, scope=Scope.REQUEST)

我的预期与疑问

原本以为两个库共用同一个数据库,只要配置正确就不会有问题,但现在频繁出现连接被意外关闭的异常。想请教大家:

  1. 这种问题的核心根源可能是什么?
  2. 有没有办法通过调整配置,让两者的连接/会话管理协调工作,避免冲突?

专家解决方案建议

一、先定位问题根源

从你的描述和配置来看,核心问题大概率是两个独立的连接池(SQLAlchemy自带池 + psycopg的AsyncConnectionPool)之间的资源管理冲突,具体可能是:

  1. 连接数超限:两个池的总连接数(SQLAlchemy的pool_size+max_overflow=15 + psycopg池默认最大连接数10)已经接近甚至超过PostgreSQL默认的max_connections(一般是100,但小规格实例可能更小),导致数据库主动关闭空闲/超额连接。
  2. 连接生命周期不一致:SQLAlchemy的会话和LangGraph的连接在请求结束时的清理顺序混乱,比如一个库先关闭了底层数据库连接,另一个库还在使用该连接的引用,触发连接不存在的错误。
  3. 驱动差异导致的连接管理不兼容:SQLAlchemy用asyncpg,LangGraph用psycopg,两个驱动对PostgreSQL连接的封装逻辑不同,独立管理时容易出现资源回收的冲突。

二、具体修复方案

方案1:统一连接池(最优解)

让LangGraph复用SQLAlchemy的连接池,避免独立池的冲突。可以通过以下步骤实现:

  1. 让SQLAlchemy改用psycopg驱动:把SQLAlchemy的异步引擎切换为psycopg的async驱动,这样两者底层用同一个驱动,连接格式兼容:
    async def get_async_engine(
        dsn: PostgresDsn,
    ) -> AsyncIterator[AsyncEngine]:
        async_engine = create_async_engine(
            url=dsn,
            driver="psycopg+asyncpg",  # 切换为psycopg的asyncpg驱动
            pool_size=5,
            max_overflow=10,
            pool_pre_ping=True,
        )
        yield async_engine
        await async_engine.dispose()
    
  2. 让LangGraph从SQLAlchemy会话获取连接:自定义LangGraph的连接获取逻辑,从SQLAlchemy的AsyncSession中提取底层的psycopg连接,替代独立的psycopg池:
    # 新增依赖:从SQLAlchemy会话获取psycopg连接
    async def get_psycopg_connection(session: AsyncSession) -> AsyncIterator[AsyncConnection]:
        # 从SQLAlchemy会话中获取底层连接
        async_conn = await session.connection()
        # 转换为psycopg的AsyncConnection
        psycopg_conn = cast(AsyncConnection, async_conn.connection)
        yield psycopg_conn
    
    # 修改LangGraph的store和checkpointer依赖,复用这个连接
    async def get_store(
        conn: AsyncConnection,
    ) -> AsyncIterator[AsyncPostgresStore]:
        store = AsyncPostgresStore(conn)  # 直接传单个连接,而非连接池
        yield store
    
    async def get_checkpointer(
        conn: AsyncConnection,
    ) -> AsyncIterator[AsyncPostgresSaver]:
        saver = AsyncPostgresSaver(conn)
        yield saver
    
    # 更新Dishka配置
    provider.provide(source=get_psycopg_connection, scope=Scope.REQUEST)
    provider.provide(source=get_store, scope=Scope.REQUEST)
    provider.provide(source=get_checkpointer, scope=Scope.REQUEST)
    # 移除原来的get_connection_pool依赖
    # provider.provide(source=get_connection_pool, scope=Scope.APP)
    
    这样LangGraph和SQLAlchemy完全共用同一个连接,避免了池之间的冲突。
方案2:协调两个独立连接池的配置

如果暂时不想切换驱动,可以调整两个池的配置,避免资源冲突:

  1. 控制总连接数:计算两个池的总连接数,确保远小于PostgreSQL的max_connections
    • 先查你的PostgreSQL实例的max_connections:执行SHOW max_connections;
    • 调整SQLAlchemy的pool_size=3max_overflow=5,psycopg池设置max_size=7,总连接数控制在15以内(远小于默认100)
  2. 给psycopg池添加健康检查
    async def get_connection_pool(
        settings: AppSettings,
    ) -> AsyncIterator[AsyncConnectionPool[AsyncConnection[DictRow]]]:
        pool: AsyncConnectionPool[AsyncConnection[DictRow]] = AsyncConnectionPool(
            conninfo=settings.postgres.psycopg_dsn,
            max_size=7,
            health_check_interval=30,  # 每30秒检查一次连接健康
            check=lambda conn: conn.is_closed,  # 检查连接是否已关闭
        )
        await pool.open()
        try:
            yield pool
        finally:
            await pool.close()
    
  3. 调整Dishka的清理顺序:让LangGraph的资源先于SQLAlchemy的会话被清理,避免会话还在使用时LangGraph关闭连接。可以通过Dishka的depends_on或者调整依赖注册顺序实现。
方案3:添加异常重试机制

在数据库操作的关键路径添加重试,处理连接意外关闭的情况:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

# 给SQLAlchemy的数据库操作添加重试
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((sqlalchemy.exc.DBAPIError, asyncpg.exceptions.ConnectionDoesNotExistError))
)
async def get_user(session: AsyncSession, user_id: int):
    return await session.get(User, user_id)

# 给LangGraph的Agent操作也添加类似重试
方案4:排查PostgreSQL端的日志

开启PostgreSQL的详细日志,定位连接被关闭的具体原因:

  1. 修改PostgreSQL的postgresql.conf
    log_connections = on
    log_disconnections = on
    log_error_verbosity = verbose
    log_statement = 'all'  # 临时开启,排查后关闭,避免日志过大
    
  2. 重启PostgreSQL,然后复现问题,查看日志中连接被关闭的具体触发条件(是连接数超限,还是某个SQL错误导致的)。

三、验证步骤

  1. 先尝试方案2的连接数调整和健康检查,快速验证是否是连接数超限的问题。
  2. 如果问题依然存在,切换到方案1的统一连接池,这是最彻底的解决方式。
  3. 最后结合方案3的重试机制,提升系统的容错性。

希望这些建议能帮你解决问题,如果还有细节需要调整,可以再补充具体的场景信息~

火山引擎 最新活动