You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何用Python multiprocessing控制每秒API调用执行次数?

控制多进程API调用的每秒执行次数方案

我之前处理过类似的大批量API调用任务,结合你用Python 3.6 + multiprocessing的场景,给你分享几个实用的控频实现思路,既能保证并行提速,又不会触发API的限流机制:

方法1:共享计数器+锁实现精准控频

这个方案通过多进程共享的计数器来统一调度请求时机,能精准控制整体的每秒请求次数,适合对QPS要求严格的场景:

import time
import pandas as pd
from multiprocessing import Pool, Value, Lock

# 初始化全局共享变量(供子进程使用)
def init_globals(counter, lock):
    global request_counter, request_lock
    request_counter = counter
    request_lock = lock

def rate_limited_api_call(api_param):
    # 设定每秒允许的最大请求数
    MAX_CALLS_PER_SEC = 20
    interval = 1.0 / MAX_CALLS_PER_SEC

    # 安全更新共享计数器
    with request_lock:
        current_request_idx = request_counter.value
        request_counter.value += 1

    # 计算当前请求需要等待的时间,保证按固定间隔发起请求
    wait_time = (current_request_idx * interval) - time.time()
    if wait_time > 0:
        time.sleep(wait_time)

    # 替换成你的实际API调用逻辑
    response = your_api_call_function(api_param)
    # 返回能直接存入DataFrame的数据格式(比如字典)
    return response

def parallel_requests(api_params_list):
    # 创建共享计数器(初始值0)和锁
    with Value('i', 0) as counter, Lock() as lock:
        # 初始化进程池,传入共享变量
        with Pool(initializer=init_globals, initargs=(counter, lock)) as pool:
            # 批量执行API调用
            results = pool.map(rate_limited_api_call, api_params_list)
    
    # 转换为DataFrame
    df = pd.DataFrame(results)
    # 写入SQL数据库(替换成你的实际配置)
    df.to_sql(
        name='your_table_name',
        con='your_sql_connection',
        if_exists='append',
        index=False
    )

原理说明:

  • multiprocessing.Value创建跨进程共享的计数器,Lock保证计数更新的线程安全
  • 每个请求根据自己的序号计算等待时间,确保所有请求严格按设定的时间间隔发起,完美控制QPS
  • 不受API调用耗时波动的影响,整体速率非常平稳

方法2:信号量+固定延迟(简单易实现)

如果对QPS的精准度要求没那么高,这个方案更简单,通过限制并发数+每个请求固定延迟来控制整体速率:

import time
import pandas as pd
from multiprocessing import Pool, Semaphore

def api_call_with_control(api_param, sem):
    # 获取信号量,限制并发数
    sem.acquire()
    try:
        # 设定每个请求的延迟,比如每秒20次则延迟0.05秒
        time.sleep(1/20)
        # 实际API调用
        response = your_api_call_function(api_param)
        return response
    finally:
        # 释放信号量
        sem.release()

def parallel_requests(api_params_list):
    # 设定最大并发数
    MAX_CONCURRENT = 5
    sem = Semaphore(MAX_CONCURRENT)
    
    with Pool(MAX_CONCURRENT) as pool:
        # 用starmap传递信号量参数
        results = pool.starmap(api_call_with_control, [(param, sem) for param in api_params_list])
    
    # 后续DataFrame转换和SQL写入逻辑同方法1
    df = pd.DataFrame(results)
    df.to_sql(...)

注意点:

  • 并发数和延迟的乘积要接近目标QPS(比如5并发×0.05秒延迟≈20次/秒)
  • 如果API调用本身耗时较长,实际QPS会略低于预期,需要根据测试结果调整延迟时间

额外建议

  1. 小批量测试:先用100个请求测试控频效果,确认QPS符合预期后再跑全量9000次
  2. 异常重试:建议给API调用加上重试逻辑(比如用tenacity库,Python3.6支持),避免因网络波动导致失败
  3. 连接池复用:如果API调用需要建立HTTP连接,建议用requests.Session复用连接,减少连接开销

内容的提问来源于stack exchange,提问作者jb007

火山引擎 最新活动