FastAPI中LangGraph(psycopg)与SQLAlchemy(asyncpg)共用PostgreSQL时频繁出现连接异常问题求助
FastAPI中LangGraph(psycopg)与SQLAlchemy(asyncpg)共用PostgreSQL时频繁出现连接异常问题求助
问题背景
大家好,我在维护一个稳定运行的FastAPI项目,核心用SQLAlchemy ORM(基于asyncpg)做数据持久化,最近集成LangGraph实现Agent的状态和 checkpoint 持久化,用的是LangGraph的AsyncPostgresStore和AsyncPostgresSaver(基于psycopg),两者连接同一个PostgreSQL数据库。项目里的ORM仓库类也会在LangGraph的工具和Agent逻辑中调用。
但上线后发现,在并发请求或者长时请求场景下,频繁出现数据库连接异常,而且错误大多集中在两者同时关闭连接或回滚会话的时候。
频繁触发的错误信息
- 核心初始错误:
psycopg.OperationalError: consuming input failed: server closed the connection unexpectedly - 后续连锁错误:
asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operationsqlalchemy.exc.DBAPIError: connection was closed in the middle of operationdishka.exceptions.ExitError: Cleanup context errors
当前代码配置
from collections.abc import AsyncIterator from typing import cast from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver from langgraph.store.postgres.aio import AsyncPostgresStore from psycopg import AsyncConnection from psycopg.rows import DictRow from psycopg_pool import AsyncConnectionPool from sqlalchemy.ext.asyncio import ( AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine, ) async def get_async_engine( dsn: PostgresDsn, ) -> AsyncIterator[AsyncEngine]: async_engine = create_async_engine( url=dsn, pool_size=5, max_overflow=10, pool_pre_ping=True, ) yield async_engine await async_engine.dispose() def get_async_session_factory( engine: AsyncEngine, ) -> async_sessionmaker[AsyncSession]: return async_sessionmaker( bind=engine, class_=AsyncSession, autoflush=False, expire_on_commit=False, ) async def get_main_async_session( async_session_factory: async_sessionmaker[AsyncSession], ) -> AsyncIterator[MainAsyncSession]: async with async_session_factory() as session: yield cast(MainAsyncSession, session) async def get_connection_pool( settings: AppSettings, ) -> AsyncIterator[AsyncConnectionPool[AsyncConnection[DictRow]]]: pool: AsyncConnectionPool[AsyncConnection[DictRow]] = AsyncConnectionPool(conninfo=settings.postgres.psycopg_dsn) await pool.open() try: yield pool finally: await pool.close() async def get_store( connection_pool: AsyncConnectionPool[AsyncConnection[DictRow]], ) -> AsyncIterator[AsyncPostgresStore]: store = AsyncPostgresStore(connection_pool) yield store async def get_checkpointer( connection_pool: AsyncConnectionPool[AsyncConnection[DictRow]], ) -> AsyncIterator[AsyncPostgresSaver]: saver = AsyncPostgresSaver(connection_pool) yield saver # Dishka 依赖注入配置 # SQLAlchemy 持久化 provider.provide(source=get_async_engine, scope=Scope.APP) provider.provide(source=get_async_session_factory, scope=Scope.APP) provider.provide(source=get_main_async_session, scope=Scope.REQUEST) # LangGraph 持久化 provider.provide(source=get_connection_pool, scope=Scope.APP) provider.provide(source=get_store, scope=Scope.REQUEST) provider.provide(source=get_checkpointer, scope=Scope.REQUEST)
我的预期与疑问
原本以为两个库共用同一个数据库,只要配置正确就不会有问题,但现在频繁出现连接被意外关闭的异常。想请教大家:
- 这种问题的核心根源可能是什么?
- 有没有办法通过调整配置,让两者的连接/会话管理协调工作,避免冲突?
专家解决方案建议
一、先定位问题根源
从你的描述和配置来看,核心问题大概率是两个独立的连接池(SQLAlchemy自带池 + psycopg的AsyncConnectionPool)之间的资源管理冲突,具体可能是:
- 连接数超限:两个池的总连接数(SQLAlchemy的
pool_size+max_overflow=15 + psycopg池默认最大连接数10)已经接近甚至超过PostgreSQL默认的max_connections(一般是100,但小规格实例可能更小),导致数据库主动关闭空闲/超额连接。 - 连接生命周期不一致:SQLAlchemy的会话和LangGraph的连接在请求结束时的清理顺序混乱,比如一个库先关闭了底层数据库连接,另一个库还在使用该连接的引用,触发连接不存在的错误。
- 驱动差异导致的连接管理不兼容:SQLAlchemy用asyncpg,LangGraph用psycopg,两个驱动对PostgreSQL连接的封装逻辑不同,独立管理时容易出现资源回收的冲突。
二、具体修复方案
方案1:统一连接池(最优解)
让LangGraph复用SQLAlchemy的连接池,避免独立池的冲突。可以通过以下步骤实现:
- 让SQLAlchemy改用psycopg驱动:把SQLAlchemy的异步引擎切换为psycopg的async驱动,这样两者底层用同一个驱动,连接格式兼容:
async def get_async_engine( dsn: PostgresDsn, ) -> AsyncIterator[AsyncEngine]: async_engine = create_async_engine( url=dsn, driver="psycopg+asyncpg", # 切换为psycopg的asyncpg驱动 pool_size=5, max_overflow=10, pool_pre_ping=True, ) yield async_engine await async_engine.dispose() - 让LangGraph从SQLAlchemy会话获取连接:自定义LangGraph的连接获取逻辑,从SQLAlchemy的AsyncSession中提取底层的psycopg连接,替代独立的psycopg池:
这样LangGraph和SQLAlchemy完全共用同一个连接,避免了池之间的冲突。# 新增依赖:从SQLAlchemy会话获取psycopg连接 async def get_psycopg_connection(session: AsyncSession) -> AsyncIterator[AsyncConnection]: # 从SQLAlchemy会话中获取底层连接 async_conn = await session.connection() # 转换为psycopg的AsyncConnection psycopg_conn = cast(AsyncConnection, async_conn.connection) yield psycopg_conn # 修改LangGraph的store和checkpointer依赖,复用这个连接 async def get_store( conn: AsyncConnection, ) -> AsyncIterator[AsyncPostgresStore]: store = AsyncPostgresStore(conn) # 直接传单个连接,而非连接池 yield store async def get_checkpointer( conn: AsyncConnection, ) -> AsyncIterator[AsyncPostgresSaver]: saver = AsyncPostgresSaver(conn) yield saver # 更新Dishka配置 provider.provide(source=get_psycopg_connection, scope=Scope.REQUEST) provider.provide(source=get_store, scope=Scope.REQUEST) provider.provide(source=get_checkpointer, scope=Scope.REQUEST) # 移除原来的get_connection_pool依赖 # provider.provide(source=get_connection_pool, scope=Scope.APP)
方案2:协调两个独立连接池的配置
如果暂时不想切换驱动,可以调整两个池的配置,避免资源冲突:
- 控制总连接数:计算两个池的总连接数,确保远小于PostgreSQL的
max_connections:- 先查你的PostgreSQL实例的
max_connections:执行SHOW max_connections; - 调整SQLAlchemy的
pool_size=3、max_overflow=5,psycopg池设置max_size=7,总连接数控制在15以内(远小于默认100)
- 先查你的PostgreSQL实例的
- 给psycopg池添加健康检查:
async def get_connection_pool( settings: AppSettings, ) -> AsyncIterator[AsyncConnectionPool[AsyncConnection[DictRow]]]: pool: AsyncConnectionPool[AsyncConnection[DictRow]] = AsyncConnectionPool( conninfo=settings.postgres.psycopg_dsn, max_size=7, health_check_interval=30, # 每30秒检查一次连接健康 check=lambda conn: conn.is_closed, # 检查连接是否已关闭 ) await pool.open() try: yield pool finally: await pool.close() - 调整Dishka的清理顺序:让LangGraph的资源先于SQLAlchemy的会话被清理,避免会话还在使用时LangGraph关闭连接。可以通过Dishka的
depends_on或者调整依赖注册顺序实现。
方案3:添加异常重试机制
在数据库操作的关键路径添加重试,处理连接意外关闭的情况:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # 给SQLAlchemy的数据库操作添加重试 @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10), retry=retry_if_exception_type((sqlalchemy.exc.DBAPIError, asyncpg.exceptions.ConnectionDoesNotExistError)) ) async def get_user(session: AsyncSession, user_id: int): return await session.get(User, user_id) # 给LangGraph的Agent操作也添加类似重试
方案4:排查PostgreSQL端的日志
开启PostgreSQL的详细日志,定位连接被关闭的具体原因:
- 修改PostgreSQL的
postgresql.conf:log_connections = on log_disconnections = on log_error_verbosity = verbose log_statement = 'all' # 临时开启,排查后关闭,避免日志过大 - 重启PostgreSQL,然后复现问题,查看日志中连接被关闭的具体触发条件(是连接数超限,还是某个SQL错误导致的)。
三、验证步骤
- 先尝试方案2的连接数调整和健康检查,快速验证是否是连接数超限的问题。
- 如果问题依然存在,切换到方案1的统一连接池,这是最彻底的解决方式。
- 最后结合方案3的重试机制,提升系统的容错性。
希望这些建议能帮你解决问题,如果还有细节需要调整,可以再补充具体的场景信息~




