遵循Google Gemini API速率限制仍触发429资源耗尽错误的排查与解决
看起来你已经做了不少速率限制的处理,但还是碰到了429错误,这种情况确实挺头疼的。我来帮你拆解可能的原因,以及对应的解决思路:
一、可能的原因分析
1. 忽略了token层面的速率限制
Google Gemini的速率限制通常有两个核心维度:调用次数和token总消耗(包含prompt和响应的token数)。你现在只按调用次数控制(1000次/分钟),但如果每个请求的token数较多(比如你的prompt加上模型响应),可能先触发了token配额的限制,而非调用次数限制。
举个例子:如果你的每个请求平均用100token,1000次调用就是10万token,而Gemini 1.5 Pro的部分账号配额可能每分钟token上限低于这个数,直接触发429。
2. 滚动窗口计算的偏差
你的TokenBucket实现是基于固定速率补充token,但Google的速率限制是滚动60秒窗口(即任意连续60秒内的请求数不能超过1000)。如果你的代码在短时间内(比如10秒)把1000个token全部用完,后续50秒本地逻辑允许发请求,但服务器的滚动窗口还没刷新,还是会触发429。
3. 网络延迟导致的时间差
本地计算的请求时间和Google服务器接收请求的时间可能存在偏差,比如请求因网络延迟晚到服务器,导致服务器看到的请求密度比你本地计算的更高,从而触发限制。
4. 同一IP的隐性限制
即使你用了多个API key,所有请求都来自同一个IP,Google可能对单IP的请求数/并发数有额外限制,这会导致即使每个key的调用次数没超,IP层面先触发了限制。
5. Chat Session复用的潜在问题
你每个worker线程创建一个独立的chat session,虽然复用session是推荐做法,但会不会存在会话层面的额外限制?或者session的send_message接口有特殊的速率控制逻辑?
二、解决建议与代码调整
1. 加入token消耗的监控与控制
首先计算每个请求的token总数(prompt + 预估响应token),然后针对token配额做限制。修改代码的话,可以在worker函数里添加token计数:
# 在worker函数中,发送请求前计算token数 prompt_token_count = genai.count_tokens(user_prompt) # 预估响应token数(你的配置是max_output_tokens=5,预留一点冗余) total_tokens = prompt_token_count + 10 # 修改TokenBucket类,支持按token数消耗,而非调用次数
之后调整TokenBucket的tokens_per_minute为对应key的token配额(可在Google Cloud Console查看)。
2. 改用滚动窗口计数器替代TokenBucket
滚动窗口的计算更贴合Google的限制逻辑,我们可以用一个列表记录最近60秒的请求时间,每次发送前检查请求数是否超限:
class RollingWindowRateLimiter: def __init__(self, max_requests=1000, window_minutes=1): self.max_requests = max_requests self.window = window_minutes * 60 self.request_timestamps = [] self.lock = threading.Lock() def wait_for_allowance(self): while True: with self.lock: now = time.time() # 清除窗口外的请求记录 self.request_timestamps = [ts for ts in self.request_timestamps if now - ts < self.window] if len(self.request_timestamps) < self.max_requests: self.request_timestamps.append(now) return # 短间隔重试,避免长时间阻塞 time.sleep(0.01)
在worker函数中,发送请求前调用rate_limiter.wait_for_allowance()即可。
3. 添加429错误的指数退避重试
即使做了完美的速率控制,偶尔还是会碰到429,这时候用指数退避重试比直接标记错误更合理:
import backoff @backoff.on_exception(backoff.expo, Exception, max_tries=3, giveup=lambda e: "429" not in str(e)) def safe_send_message(chat_session, prompt): response = chat_session.send_message(prompt) return response.text.strip()
之后在worker里用safe_send_message替代直接调用send_message。
4. 检查API key的实际配额
登录Google Cloud Console,查看每个API key对应的Gemini配额,确认是否真的有1000次/分钟的调用限制。部分免费额度或试用账号的配额可能更低,别想当然用官方文档的默认值。
5. 调整并发数与延迟
减少每个key的WORKERS_PER_KEY数量(比如从5降到2),避免同一key的并发请求过多。另外,把等待间隔从0.1秒调整为0.01秒,让等待逻辑更精细。
三、代码修改示例(滚动窗口+重试)
这里给你一个简化的修改后的核心代码片段,替换原来的TokenBucket逻辑:
import backoff import time import threading class RollingWindowRateLimiter: def __init__(self, max_requests=1000, window_minutes=1): self.max_requests = max_requests self.window = window_minutes * 60 self.request_timestamps = [] self.lock = threading.Lock() def wait_for_allowance(self): while True: with self.lock: now = time.time() self.request_timestamps = [ts for ts in self.request_timestamps if now - ts < self.window] if len(self.request_timestamps) < self.max_requests: self.request_timestamps.append(now) return time.sleep(0.01) @backoff.on_exception(backoff.expo, Exception, max_tries=3, giveup=lambda e: "429" not in str(e)) def safe_send_message(chat_session, prompt): response = chat_session.send_message(prompt) return response.text.strip() def worker(api_key, task_queue, results, results_lock, rate_limiter): genai.configure(api_key=api_key) model = genai.GenerativeModel( model_name="gemini-1.5-pro-002", system_instruction=system_instructions, generation_config=generation_config, ) chat_session = model.start_chat() while True: try: index, theme, category, topic_name, description = task_queue.get_nowait() except queue.Empty: break user_prompt = f""" We are a company focused on process optimization using digital solutions and artificial intelligence. Our goal is to streamline business operations and improve efficiency through advanced technologies. Topic Details: - **Theme**: {theme} - **Category**: {category} - **Topic Name**: {topic_name} - **Description**: {description} Does this topic directly align with our company's purpose? """ try: rate_limiter.wait_for_allowance() relevance = safe_send_message(chat_session, user_prompt) with results_lock: results[index] = relevance print(f"Zeile {index} klassifiziert: {relevance}") except Exception as e: with results_lock: results[index] = f"Error: {e}" print(f"Zeile {index} Fehler: {e}") finally: task_queue.task_done() # 替换原token_buckets为rate_limiters rate_limiters = [RollingWindowRateLimiter(max_requests=1000) for _ in api_keys] threads = [] for key, limiter in zip(api_keys, rate_limiters): for _ in range(WORKERS_PER_KEY): t = threading.Thread(target=worker, args=(key, task_queue, results, results_lock, limiter)) t.start() threads.append(t)
备注:内容来源于stack exchange,提问作者Markwardt Stahl




