You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多进程池:执行任务时动态调整进程数的技术问询

解决方案:Python 2.7动态调整CPU密集型多进程任务的并发数

针对你提到的白天限制CPU使用、夜间最大化运行效率的需求,结合Python 2.7的特性,我整理了三个经过实践验证的方案,你可以根据现有代码的复杂度和运维成本选择:


方案1:自定义任务调度逻辑,动态控制进程数

既然multiprocessing.Pool无法动态调整大小,我们可以放弃直接使用Pool,改用主进程+任务队列+动态进程管理的模式,核心思路是定时检查当前时段,调整同时运行的进程数量。

代码示例:

import multiprocessing
import time
from datetime import datetime

def cpu_intensive_task(task_id):
    # 替换成你的实际CPU密集型任务逻辑
    print(f"Started task {task_id} at {datetime.now()}")
    # 模拟耗时操作,实际任务中替换为你的业务代码
    for _ in range(10**8):
        pass
    print(f"Finished task {task_id} at {datetime.now()}")

def get_allowed_concurrency():
    """根据当前时间返回允许的并发进程数"""
    current_hour = datetime.now().hour
    # 自定义时段规则:白天9:00-18:00限制为4进程,夜间用满CPU
    if 9 <= current_hour < 18:
        return 4
    else:
        return multiprocessing.cpu_count()

def main():
    # 初始化任务队列,这里假设你有100个独立任务
    task_queue = multiprocessing.Queue()
    for task_id in range(100):
        task_queue.put(task_id)
    
    running_processes = []
    check_interval = 60  # 每分钟检查一次并发数和任务状态

    while not task_queue.empty() or running_processes:
        allowed_concurrency = get_allowed_concurrency()
        
        # 1. 清理已完成的进程
        running_processes = [p for p in running_processes if p.is_alive()]
        
        # 2. 终止超出限制的进程(如果当前运行数超过允许值)
        while len(running_processes) > allowed_concurrency:
            proc_to_kill = running_processes.pop()
            proc_to_kill.terminate()
            proc_to_kill.join()
            print(f"Terminated a process to keep concurrency under {allowed_concurrency}")
        
        # 3. 启动新进程直到达到允许的并发数(且还有任务未执行)
        while len(running_processes) < allowed_concurrency and not task_queue.empty():
            task_id = task_queue.get()
            new_proc = multiprocessing.Process(target=cpu_intensive_task, args=(task_id,))
            new_proc.start()
            running_processes.append(new_proc)
            print(f"Started task {task_id}, current running processes: {len(running_processes)}")
        
        time.sleep(check_interval)

if __name__ == "__main__":
    main()

优缺点:

  • ✅ 完全自定义控制逻辑,适配你的时段规则
  • ✅ 不需要引入第三方依赖
  • ❌ 需要修改现有任务的启动逻辑,适合还在迭代的代码

方案2:操作系统级CPU亲和性限制(最小代码改动)

如果不想大幅修改现有代码,推荐用CPU亲和性绑定的方式:白天将进程绑定到部分CPU核心,夜间放开绑定。这种方式不需要调整进程数量,而是通过系统层面限制进程能使用的CPU资源,从而避免机器响应变慢。

实现方式:

方式A:通过taskset命令启动脚本

白天启动时绑定到4个核心(假设CPU有8核):

taskset -c 0-3 python your_large_task.py

夜间启动时绑定到所有核心:

taskset -c 0-7 python your_large_task.py

你可以用cron定时切换:比如9点执行taskset调整亲和性,或者重启脚本(如果任务是持续运行的)。

方式B:在Python代码中动态设置(依赖psutil

Python 2.7可以用psutil库(支持2.7版本)动态修改当前进程的CPU亲和性:

import psutil
from datetime import datetime
import time

def adjust_cpu_affinity():
    current_hour = datetime.now().hour
    cpu_count = psutil.cpu_count()
    if 9 <= current_hour < 18:
        # 白天限制使用前4个核心
        psutil.Process().cpu_affinity(list(range(4)))
    else:
        # 夜间使用所有核心
        psutil.Process().cpu_affinity(list(range(cpu_count)))
    print(f"Adjusted CPU affinity to {psutil.Process().cpu_affinity()}")

# 启动时先设置一次
adjust_cpu_affinity()

# 定时检查并调整(比如每小时检查一次)
while True:
    time.sleep(3600)
    adjust_cpu_affinity()

安装psutil

pip install psutil==5.8.0  # 5.8.0是支持Python2.7的最后一个版本

优缺点:

  • ✅ 几乎不需要修改现有任务代码,适合已经成型的大型任务
  • ✅ 系统级限制,效果直接
  • ❌ 只是限制CPU使用,进程数量还是原来的,不过白天机器响应会因为CPU被限制而恢复

方案3:用Celery实现分布式任务调度(适合长期维护)

如果你的任务是完全独立的,可以引入Celery做任务队列,通过动态调整Worker数量来实现白天/夜间的并发控制。这种方式扩展性更好,甚至可以扩展到多台机器。

步骤:

  1. 安装依赖(Celery 4.4.x支持Python2.7):
pip install celery==4.4.7 redis==2.10.6  # 用Redis做消息队列
  1. 编写任务文件tasks.py
from celery import Celery
import time

app = Celery('cpu_tasks', broker='redis://localhost:6379/0')

@app.task
def cpu_intensive_task(task_id):
    # 替换成你的实际任务逻辑
    print(f"Processing task {task_id}")
    for _ in range(10**8):
        pass
    return f"Task {task_id} completed"
  1. 用Supervisor管理Worker进程
    编写Supervisor配置文件celery_worker.conf
[program:celery_worker]
command=celery -A tasks worker --loglevel=info --concurrency=%(process_num)s
directory=/path/to/your/project
autostart=true
autorestart=true
user=your_username
stdout_logfile=/var/log/celery-worker.log
stderr_logfile=/var/log/celery-worker-error.log
  1. 定时调整Worker并发数
    cron添加定时任务:
  • 每天9点(白天):调整Worker并发数为4
0 9 * * * supervisorctl update celery_worker process_num=4
  • 每天18点(夜间):调整Worker并发数为CPU核心数
0 18 * * * supervisorctl update celery_worker process_num=$(nproc)

优缺点:

  • ✅ 解耦任务提交和执行,扩展性强,支持多机器部署
  • ✅ 不需要修改任务逻辑,只需封装成Celery任务
  • ❌ 需要引入消息队列(如Redis)和Supervisor,增加运维成本

内容的提问来源于stack exchange,提问作者KieranL

火山引擎 最新活动