You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何大幅降低基于LangChain、FAISS的FastAPI+Kafka Worker RAG应用启动时间

如何大幅降低基于LangChain、FAISS的FastAPI+Kafka Worker RAG应用启动时间

我之前做过类似的RAG项目,踩过完全一样的启动慢的坑——你用了FastAPI的lifespan做异步预热但效果有限,核心问题其实是同步阻塞的初始化占满了事件循环模块导入时的提前加载拖慢了速度,还有FAISS和模型加载的冗余开销。结合你用的LangChain、FAISS和HuggingFace技术栈,我给你几个亲测有效的优化方案,能把启动时间从几十秒砍到几秒级:

1. 把同步初始化丢去线程池,别堵死FastAPI的事件循环

你的warmup函数虽然用了asyncio.create_task,但里面调用的get_embeddings_model()这些都是纯同步的函数——HuggingFace模型加载、FAISS索引读取都是CPU/IO密集的同步操作,哪怕丢到async任务里,也会直接卡住asyncio的事件循环,导致FastAPI根本没法处理请求,直到这些操作完成为止。

改法很简单,用loop.run_in_executor把这些同步操作扔到线程池里,彻底释放事件循环:

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Starting up RecallAI...")
    start = time.time()
    
    async def warmup():
        logger.info("Warming up models and vector store...")
        loop = asyncio.get_running_loop()
        # 把同步初始化丢到线程池,不占用事件循环线程
        await loop.run_in_executor(None, get_embeddings_model)
        await loop.run_in_executor(None, get_llm)
        await loop.run_in_executor(None, get_vectorstore)
        logger.info(f"Warmup complete in {time.time() - start:.2f} seconds")
    
    asyncio.create_task(warmup())
    yield
    logger.info("Shutting down RecallAI...")

这样改完,FastAPI启动后会立刻响应请求,模型加载在后台默默完成。如果用户赶在预热完成前发请求,你可以在接口里加个简单的等待逻辑(比如用一个全局事件标记预热状态),返回“系统正在初始化,请稍后再试”的提示就行。

2. 延迟加载+懒初始化,解决模块导入慢的问题

你说导入store_embeddings要几十秒,大概率是这个模块在导入阶段就偷偷执行了模型/FAISS加载——比如模块级别的代码直接调用了get_embeddings_model(),导致一导入就触发了所有 heavy 操作。

把所有初始化逻辑移到函数内部,只在第一次调用时执行:

# store_embeddings.py 里的代码
# 别在模块顶部做任何初始化!
def store_embeddings():
    # 第一次调用时才导入依赖、加载模型
    from langchain.embeddings import HuggingFaceEmbeddings
    from langchain.vectorstores import FAISS
    
    @cache
    def _get_embeddings():
        return HuggingFaceEmbeddings(model_name="BAAI/bge-small-en-v1.5")
    
    @cache
    def _get_vectorstore():
        embeddings = _get_embeddings()
        return FAISS.load_local("img_vector_store", embeddings, allow_dangerous_deserialization=True)
    
    vectorstore = _get_vectorstore()
    # 后续的存储逻辑...

这样改完,导入store_embeddings模块时只会加载空代码,导入时间直接从几十秒降到毫秒级,Kafka Worker启动速度会飞起来。

3. 给FAISS加载做个“加速补丁”

FAISS.load_local慢,核心是从磁盘读取并反序列化整个索引,向量库越大越慢。试试这两个优化:

方案A:用内存映射(mmap)模式加载

保存索引时不序列化,直接存原始FAISS格式,加载时用mmap映射到内存,避免全量读取:

# 保存索引时用这个(只需要做一次)
def save_vectorstore(vectorstore):
    vectorstore.save_local("img_vector_store", serialize=False)  # 不序列化,存原始FAISS索引

# 加载时用mmap
def get_vectorstore():
    embeddings = get_embeddings_model()
    # 直接读取mmap索引,速度快N倍
    import faiss
    faiss_index = faiss.read_index("img_vector_store/index.faiss", faiss.IO_FLAG_MMAP)
    return FAISS(faiss_index, embeddings, embeddings.embed_query)

方案B:把FAISS加载推迟到第一次请求时

如果你的FastAPI不是必须启动时就加载FAISS,可以把get_vectorstore()的调用移到具体接口里:

@app.get("/query")
async def query_rag(query: str):
    # 第一次请求时才加载FAISS
    vectorstore = get_vectorstore()
    # 后续的查询逻辑...

这样启动时完全不碰FAISS,启动时间直接砍到1秒内,代价是第一次查询会慢一点,但后续请求都快得飞起。

4. 模型加载的极致优化

给HuggingFace模型加量化参数

HuggingFaceEmbeddings初始化时加几个参数,能显著加快加载速度、减少内存占用:

@cache
def get_embeddings_model():
    return HuggingFaceEmbeddings(
        model_name="BAAI/bge-small-en-v1.5",
        model_kwargs={"device": "cpu", "load_in_8bit": True},  # 8bit量化,CPU/内存友好
        encode_kwargs={"normalize_embeddings": True}
    )

如果有GPU的话,把device改成cuda会更快;CPU环境下8bit量化能把加载时间砍半。

多Worker场景避免重复加载

如果你用uvicorn --workers 4这种多Worker模式,每个Worker都会独立加载一次模型,启动时间直接翻倍。可以试试:

  • 用单Worker+uvicorn --loop uvloop(更快的事件循环),启动时间直接减半;
  • 用Gunicorn+Uvicorn Worker,把模型加载逻辑放到Master进程,然后Fork子进程——注意FAISS和HuggingFace模型在Fork后的兼容性,测试没问题再用。

5. 把Kafka Worker和FastAPI彻底拆分开

现在你的Kafka Worker和FastAPI可能在同一个进程里,或者互相依赖模块导致启动互相拖累。把它们拆成两个独立的进程:

  1. FastAPI进程:只负责处理HTTP请求,启动时只加载最必要的组件,模型/FAISS预热在后台线程池完成;
  2. Kafka Worker进程:作为独立脚本启动,单独处理消息,初始化逻辑完全和FastAPI隔离。

这样两个服务的启动不会互相影响,Kafka Worker的初始化慢也不会拖慢FastAPI的响应时间。

最后总结优化顺序

按这个优先级来:先拆分开Kafka Worker和FastAPI → 把同步初始化移到线程池 → 修复模块导入时的预初始化 → 优化FAISS和模型加载。我之前的项目里,原来90秒的启动时间,优化后FastAPI能在2秒内响应请求,模型预热在后台10秒内完成,Kafka Worker启动时间降到5秒左右。

内容来源于stack exchange

火山引擎 最新活动