如何用Python multiprocessing控制每秒API调用执行次数?
控制多进程API调用的每秒执行次数方案
我之前处理过类似的大批量API调用任务,结合你用Python 3.6 + multiprocessing的场景,给你分享几个实用的控频实现思路,既能保证并行提速,又不会触发API的限流机制:
方法1:共享计数器+锁实现精准控频
这个方案通过多进程共享的计数器来统一调度请求时机,能精准控制整体的每秒请求次数,适合对QPS要求严格的场景:
import time import pandas as pd from multiprocessing import Pool, Value, Lock # 初始化全局共享变量(供子进程使用) def init_globals(counter, lock): global request_counter, request_lock request_counter = counter request_lock = lock def rate_limited_api_call(api_param): # 设定每秒允许的最大请求数 MAX_CALLS_PER_SEC = 20 interval = 1.0 / MAX_CALLS_PER_SEC # 安全更新共享计数器 with request_lock: current_request_idx = request_counter.value request_counter.value += 1 # 计算当前请求需要等待的时间,保证按固定间隔发起请求 wait_time = (current_request_idx * interval) - time.time() if wait_time > 0: time.sleep(wait_time) # 替换成你的实际API调用逻辑 response = your_api_call_function(api_param) # 返回能直接存入DataFrame的数据格式(比如字典) return response def parallel_requests(api_params_list): # 创建共享计数器(初始值0)和锁 with Value('i', 0) as counter, Lock() as lock: # 初始化进程池,传入共享变量 with Pool(initializer=init_globals, initargs=(counter, lock)) as pool: # 批量执行API调用 results = pool.map(rate_limited_api_call, api_params_list) # 转换为DataFrame df = pd.DataFrame(results) # 写入SQL数据库(替换成你的实际配置) df.to_sql( name='your_table_name', con='your_sql_connection', if_exists='append', index=False )
原理说明:
- 用
multiprocessing.Value创建跨进程共享的计数器,Lock保证计数更新的线程安全 - 每个请求根据自己的序号计算等待时间,确保所有请求严格按设定的时间间隔发起,完美控制QPS
- 不受API调用耗时波动的影响,整体速率非常平稳
方法2:信号量+固定延迟(简单易实现)
如果对QPS的精准度要求没那么高,这个方案更简单,通过限制并发数+每个请求固定延迟来控制整体速率:
import time import pandas as pd from multiprocessing import Pool, Semaphore def api_call_with_control(api_param, sem): # 获取信号量,限制并发数 sem.acquire() try: # 设定每个请求的延迟,比如每秒20次则延迟0.05秒 time.sleep(1/20) # 实际API调用 response = your_api_call_function(api_param) return response finally: # 释放信号量 sem.release() def parallel_requests(api_params_list): # 设定最大并发数 MAX_CONCURRENT = 5 sem = Semaphore(MAX_CONCURRENT) with Pool(MAX_CONCURRENT) as pool: # 用starmap传递信号量参数 results = pool.starmap(api_call_with_control, [(param, sem) for param in api_params_list]) # 后续DataFrame转换和SQL写入逻辑同方法1 df = pd.DataFrame(results) df.to_sql(...)
注意点:
- 并发数和延迟的乘积要接近目标QPS(比如5并发×0.05秒延迟≈20次/秒)
- 如果API调用本身耗时较长,实际QPS会略低于预期,需要根据测试结果调整延迟时间
额外建议
- 小批量测试:先用100个请求测试控频效果,确认QPS符合预期后再跑全量9000次
- 异常重试:建议给API调用加上重试逻辑(比如用
tenacity库,Python3.6支持),避免因网络波动导致失败 - 连接池复用:如果API调用需要建立HTTP连接,建议用
requests.Session复用连接,减少连接开销
内容的提问来源于stack exchange,提问作者jb007




