You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在FastAPI中正确集成AIOKafka生产者并解决性能问题?

如何在FastAPI中正确集成AIOKafka生产者并解决性能问题?

兄弟,你这问题我之前也踩过坑!核心原因一目了然:你现在的get_producer依赖每次请求都会重新创建并启动一个AIOKafkaProducer实例——启动Kafka生产者需要和集群建立连接、初始化各种资源,这个过程开销不小,反复执行肯定会拖慢接口响应速度。

咱们得换个思路:用FastAPI的生命周期事件只启动一次生产者,全局复用这个实例,彻底解决重复初始化的问题。下面是具体的修改方案:

1. 核心问题分析

你的get_producer是一个AsyncGenerator,每次请求进入时都会调用await startup_producer(),也就是每次请求都新建一个生产者并启动,这完全是没必要的浪费。正确的姿势是:生产者实例在应用启动时创建一次,所有请求共享,应用关闭时再停止

2. 修改后的完整代码实现

第一步:调整生产者的生命周期管理(全局复用实例)

修改你的producer相关代码,用全局变量存储生产者实例,然后通过FastAPI的启动/关闭事件来初始化和销毁它:

import json
import ssl
import time
from fastapi import Depends
from aiokafka import AIOKafkaProducer
from aiokafka.helpers import create_ssl_context
from app.config import settings

# 全局变量:存储唯一的生产者实例
_kafka_producer: AIOKafkaProducer | None = None

def producer_serializer(message):
    return json.dumps(message).encode('utf-8')

def get_ssl_context():
    """Get SSL context for consumer and producer."""
    if settings.kafka_service.security_protocol == "SSL":
        return create_ssl_context(
            cafile=settings.kafka_service.ssl_cafile,
            certfile=settings.kafka_service.ssl_certfile,
            keyfile=settings.kafka_service.ssl_keyfile
            # 补充你的SSL配置参数
        )
    return None

async def startup_producer():
    """FastAPI启动事件:初始化Kafka生产者(仅执行一次)"""
    global _kafka_producer
    start_time = time.time()
    
    ssl_ctx = get_ssl_context()
    _kafka_producer = AIOKafkaProducer(
        bootstrap_servers=settings.kafka_service.bootstrap_servers,
        security_protocol=settings.kafka_service.security_protocol,
        ssl_context=ssl_ctx,
        value_serializer=producer_serializer
        # 其他你的Kafka配置参数
    )
    
    await _kafka_producer.start()
    elapsed_time = time.time() - start_time
    print(f"KAFKA_PRODUCER_STARTUP_TIME: {elapsed_time:.4f} seconds")

async def shutdown_producer():
    """FastAPI关闭事件:停止Kafka生产者(仅执行一次)"""
    global _kafka_producer
    if _kafka_producer:
        await _kafka_producer.stop()

async def get_producer() -> AIOKafkaProducer:
    """依赖:获取全局复用的Kafka生产者实例"""
    if not _kafka_producer:
        raise RuntimeError("Kafka producer has not been initialized yet!")
    return _kafka_producer

第二步:在FastAPI应用中注册生命周期事件

找到你创建FastAPI实例的地方,把启动/关闭事件注册进去:

from fastapi import FastAPI
from app.kafka.producer import startup_producer, shutdown_producer

app = FastAPI(title="Your App Name")

# 注册Kafka生产者的启动和关闭事件
app.add_event_handler("startup", startup_producer)
app.add_event_handler("shutdown", shutdown_producer)

第三步:你的接口代码可以保持不变

现在你的接口依赖get_producer时,只会拿到全局已经初始化好的生产者实例,不会再每次请求都启动新的了:

from fastapi import APIRouter, Request, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_db
from app.kafka.producer import get_producer
from aiokafka import AIOKafkaProducer

my_router = APIRouter()

@my_router.post("/{data_type}")
async def create_task(
    data_type: str,
    request: Request,
    db: AsyncSession = Depends(get_db),
    producer: AIOKafkaProducer = Depends(get_producer)
):
    start_time = time.time()
    
    # 这里可以正常使用producer发送消息,比如:
    # await producer.send_and_wait(f"topic_{data_type}", {"key": "value"})
    
    elapsed_time = time.time() - start_time
    print(f"REQUEST_PROCESS_TIME: {elapsed_time:.4f} seconds")
    return {"status": "OK"}

3. 关键说明

  • 全局实例的安全性:在单进程FastAPI应用(比如直接用uvicorn启动)中,全局变量是安全的;如果是多进程部署(uvicorn --workers N),每个worker会有自己的全局生产者实例,这其实是合理的——每个worker一个生产者实例,比每次请求创建高效得多。
  • 为什么原来的写法慢:之前的get_producer是AsyncGenerator,每次请求都会触发startup_producer,也就是每次都要和Kafka集群建立新连接、初始化资源,这个过程通常要几百毫秒甚至更久,直接导致接口响应变慢。
  • 发送消息的建议:如果你的业务允许,用producer.send()(非阻塞)比send_and_wait()(阻塞等待确认)性能更好,前者会把消息放入队列异步发送。

这样改完之后,你再测一下接口响应时间,应该会和去掉生产者依赖时一样快了!

火山引擎 最新活动