You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

多Worker场景下Granian部署FastAPI服务日志丢失问题排查求助

排查思路与解决方案

首先,你的问题核心是多进程(6个Granian Worker)共享同一个文件日志处理器导致的日志丢失,这是Python标准库logging在多进程场景下的典型问题,主要有两个触发点:多进程写文件的竞态冲突、轮转时部分进程的文件句柄未更新。

一、排查步骤

先快速定位问题根源:

  1. 验证多进程文件句柄泄漏
    在日志轮转后,登录容器执行lsof | grep my_app.log,查看各个Worker进程的文件句柄指向。如果发现某个进程仍在写入已经被重命名的旧日志文件(比如my_app.log.2024-05-20),那就是核心问题——轮转时只有触发轮转的进程更新了文件句柄,其他进程还在往旧文件写,而旧文件被压缩后这些日志就丢失了。
  2. 检查轮转逻辑的异常
    _gzip_rotator方法里临时添加调试输出(比如打印到stdout),看是否有os.replace失败的情况。如果你的logs目录是挂载的宿主机卷,而tempfile.mkstemp默认在容器内部的/tmp创建临时文件,跨文件系统的os.replace会报错,导致旧文件没被删除,后续日志会继续写入旧文件。
  3. 单Worker验证
    临时把Granian的--workers改成1,运行1-2个轮转周期(比如一天),如果日志不再丢失,就100%确认是多进程写同一个文件的问题。

二、具体解决方案

根据你的场景,推荐以下三种方案,按优先级排序:

方案1:用Docker日志驱动接管日志(最省心)

放弃Python代码里的文件轮转,让Docker统一收集和管理日志,完全避免多进程写文件的冲突:

  1. 修改Docker Compose配置,添加日志驱动设置:
    services:
      my-app:
        build: .
        container_name: my-app
        network_mode: "host"
        restart: unless-stopped
        logging:
          driver: "json-file"
          options:
            max-size: "100m"  # 单个日志文件最大100M
            max-file: "30"    # 保留最近30个日志文件
            compress: "true"  # 自动压缩旧日志
        # 移除原来的日志卷挂载,因为日志由Docker管理
        # volumes:
        #   - ./logs:/src/my_app/logs
    
  2. 修改Python日志配置,只保留StreamHandler(输出到stdout/stderr):
    setup_logger方法里删除file_handler相关代码,只保留stream_handler。这样所有Worker的日志都会输出到容器的标准输出,由Docker统一收集、轮转和压缩。

方案2:每个Worker写独立日志文件

让每个Granian Worker进程生成自己的日志文件,避免共享文件的冲突:
修改日志配置里的log_file参数,加入进程ID:

@classmethod
def setup_logger(
    cls,
    name: str = "my_app",
    level: int = logging.INFO,
    log_file: str = "logs/my_app.log",
    backup_count: int = 365,
    when: str = "midnight",
    interval: int = 1,
) -> logging.Logger:
    # 替换log_file为带进程ID的路径
    log_file = f"logs/my_app_{os.getpid()}.log"
    # 后续代码保持不变...

这样每个Worker会生成类似my_app_123.log的独立日志文件,各自处理轮转,不会互相干扰。缺点是日志文件分散,需要用工具(比如cat logs/my_app_*.log)合并查看。

方案3:用QueueHandler+QueueListener实现进程安全日志

通过队列让所有Worker把日志发送到主进程,由主进程统一写入文件,这是Python官方推荐的多进程日志解决方案:

  1. 修改LoggerConfig类,添加全局队列和监听器:
    import multiprocessing
    from logging.handlers import QueueHandler, QueueListener
    
    class LoggerConfig:
        _LOG_FMT: Final[str] = "%(asctime)s - %(levelname)s - %(message)s"
        _COPY_BUFSIZE: Final[int] = 1024 * 1024 # 1 MiB
        _queue = None
        _listener = None
    
        @classmethod
        def setup_main_process_logger(cls, name="my_app", level=logging.INFO, log_file="logs/my_app.log"):
            """主进程(Granian启动前)调用,初始化队列和监听器"""
            if cls._listener is not None:
                return logging.getLogger(name)
            
            # 创建全局队列
            cls._queue = multiprocessing.Queue(-1)
    
            # 创建日志处理器(和原来的逻辑一致)
            formatter = logging.Formatter(cls._LOG_FMT)
            stream_handler = logging.StreamHandler()
            stream_handler.setLevel(level)
            stream_handler.setFormatter(formatter)
    
            log_path = Path(log_file)
            log_path.parent.mkdir(parents=True, exist_ok=True)
            file_handler = logging.handlers.TimedRotatingFileHandler(
                filename=str(log_path),
                when="midnight",
                interval=1,
                backupCount=365,
                encoding="utf-8",
                errors="backslashreplace",
                delay=True,
            )
            file_handler.setLevel(level)
            file_handler.suffix = "%Y-%m-%d"
            file_handler.namer = cls._gzip_namer
            file_handler.rotator = cls._gzip_rotator
            file_handler.setFormatter(formatter)
    
            # 启动队列监听器
            cls._listener = QueueListener(cls._queue, stream_handler, file_handler)
            cls._listener.start()
    
            # 主进程的日志处理器用QueueHandler
            logger = logging.getLogger(name)
            logger.setLevel(level)
            logger.propagate = False
            logger.addHandler(QueueHandler(cls._queue))
            return logger
    
        @classmethod
        def setup_worker_process_logger(cls, name="my_app", level=logging.INFO):
            """Worker进程调用,只添加QueueHandler"""
            logger = logging.getLogger(name)
            if not logger.handlers:
                logger.setLevel(level)
                logger.propagate = False
                logger.addHandler(QueueHandler(cls._queue))
            return logger
    
        # 保留原来的_gzip_namer和_gzip_rotator方法...
    
  2. 在FastAPI入口文件src/my_app/main.py里,判断进程类型并初始化日志:
    import multiprocessing
    from .logger_config import LoggerConfig
    
    # 主进程初始化日志
    if multiprocessing.parent_process() is None:
        LoggerConfig.setup_main_process_logger()
    else:
        # Worker进程初始化日志
        LoggerConfig.setup_worker_process_logger()
    
    from fastapi import FastAPI
    app = FastAPI()
    
    # 后续路由逻辑...
    
    这样所有Worker的日志都会通过队列发送到主进程,由主进程统一写入文件,完全避免多进程写文件的竞态问题。

三、额外注意点

如果坚持使用原来的文件轮转逻辑,还要修复_gzip_rotator里的跨文件系统问题:
tempfile.mkstempdir参数指定为日志文件所在目录,确保临时文件和目标文件在同一个文件系统,这样os.replace才能原子执行:

fd, tmp = tempfile.mkstemp(prefix=".logrotate-", suffix=".gz", dir=str(dest_dir))

内容的提问来源于stack exchange,提问作者Geo48

火山引擎 最新活动