You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

遵循Google Gemini API速率限制仍触发429资源耗尽错误的排查与解决

Google Gemini API速率限制仍触发429资源耗尽错误的排查与解决

看起来你已经做了不少速率限制的处理,但还是碰到了429错误,这种情况确实挺头疼的。我来帮你拆解可能的原因,以及对应的解决思路:

一、可能的原因分析

1. 忽略了token层面的速率限制

Google Gemini的速率限制通常有两个核心维度:调用次数token总消耗(包含prompt和响应的token数)。你现在只按调用次数控制(1000次/分钟),但如果每个请求的token数较多(比如你的prompt加上模型响应),可能先触发了token配额的限制,而非调用次数限制。

举个例子:如果你的每个请求平均用100token,1000次调用就是10万token,而Gemini 1.5 Pro的部分账号配额可能每分钟token上限低于这个数,直接触发429。

2. 滚动窗口计算的偏差

你的TokenBucket实现是基于固定速率补充token,但Google的速率限制是滚动60秒窗口(即任意连续60秒内的请求数不能超过1000)。如果你的代码在短时间内(比如10秒)把1000个token全部用完,后续50秒本地逻辑允许发请求,但服务器的滚动窗口还没刷新,还是会触发429。

3. 网络延迟导致的时间差

本地计算的请求时间和Google服务器接收请求的时间可能存在偏差,比如请求因网络延迟晚到服务器,导致服务器看到的请求密度比你本地计算的更高,从而触发限制。

4. 同一IP的隐性限制

即使你用了多个API key,所有请求都来自同一个IP,Google可能对单IP的请求数/并发数有额外限制,这会导致即使每个key的调用次数没超,IP层面先触发了限制。

5. Chat Session复用的潜在问题

你每个worker线程创建一个独立的chat session,虽然复用session是推荐做法,但会不会存在会话层面的额外限制?或者session的send_message接口有特殊的速率控制逻辑?

二、解决建议与代码调整

1. 加入token消耗的监控与控制

首先计算每个请求的token总数(prompt + 预估响应token),然后针对token配额做限制。修改代码的话,可以在worker函数里添加token计数:

# 在worker函数中,发送请求前计算token数
prompt_token_count = genai.count_tokens(user_prompt)
# 预估响应token数(你的配置是max_output_tokens=5,预留一点冗余)
total_tokens = prompt_token_count + 10
# 修改TokenBucket类,支持按token数消耗,而非调用次数

之后调整TokenBucket的tokens_per_minute为对应key的token配额(可在Google Cloud Console查看)。

2. 改用滚动窗口计数器替代TokenBucket

滚动窗口的计算更贴合Google的限制逻辑,我们可以用一个列表记录最近60秒的请求时间,每次发送前检查请求数是否超限:

class RollingWindowRateLimiter:
    def __init__(self, max_requests=1000, window_minutes=1):
        self.max_requests = max_requests
        self.window = window_minutes * 60
        self.request_timestamps = []
        self.lock = threading.Lock()

    def wait_for_allowance(self):
        while True:
            with self.lock:
                now = time.time()
                # 清除窗口外的请求记录
                self.request_timestamps = [ts for ts in self.request_timestamps if now - ts < self.window]
                if len(self.request_timestamps) < self.max_requests:
                    self.request_timestamps.append(now)
                    return
            # 短间隔重试,避免长时间阻塞
            time.sleep(0.01)

在worker函数中,发送请求前调用rate_limiter.wait_for_allowance()即可。

3. 添加429错误的指数退避重试

即使做了完美的速率控制,偶尔还是会碰到429,这时候用指数退避重试比直接标记错误更合理:

import backoff

@backoff.on_exception(backoff.expo, Exception, max_tries=3, giveup=lambda e: "429" not in str(e))
def safe_send_message(chat_session, prompt):
    response = chat_session.send_message(prompt)
    return response.text.strip()

之后在worker里用safe_send_message替代直接调用send_message

4. 检查API key的实际配额

登录Google Cloud Console,查看每个API key对应的Gemini配额,确认是否真的有1000次/分钟的调用限制。部分免费额度或试用账号的配额可能更低,别想当然用官方文档的默认值。

5. 调整并发数与延迟

减少每个key的WORKERS_PER_KEY数量(比如从5降到2),避免同一key的并发请求过多。另外,把等待间隔从0.1秒调整为0.01秒,让等待逻辑更精细。

三、代码修改示例(滚动窗口+重试)

这里给你一个简化的修改后的核心代码片段,替换原来的TokenBucket逻辑:

import backoff
import time
import threading

class RollingWindowRateLimiter:
    def __init__(self, max_requests=1000, window_minutes=1):
        self.max_requests = max_requests
        self.window = window_minutes * 60
        self.request_timestamps = []
        self.lock = threading.Lock()

    def wait_for_allowance(self):
        while True:
            with self.lock:
                now = time.time()
                self.request_timestamps = [ts for ts in self.request_timestamps if now - ts < self.window]
                if len(self.request_timestamps) < self.max_requests:
                    self.request_timestamps.append(now)
                    return
            time.sleep(0.01)

@backoff.on_exception(backoff.expo, Exception, max_tries=3, giveup=lambda e: "429" not in str(e))
def safe_send_message(chat_session, prompt):
    response = chat_session.send_message(prompt)
    return response.text.strip()

def worker(api_key, task_queue, results, results_lock, rate_limiter):
    genai.configure(api_key=api_key)
    model = genai.GenerativeModel(
        model_name="gemini-1.5-pro-002",
        system_instruction=system_instructions,
        generation_config=generation_config,
    )
    chat_session = model.start_chat()

    while True:
        try:
            index, theme, category, topic_name, description = task_queue.get_nowait()
        except queue.Empty:
            break

        user_prompt = f"""
We are a company focused on process optimization using digital solutions and artificial intelligence. Our goal is to streamline business operations and improve efficiency through advanced technologies.

Topic Details:
- **Theme**: {theme}
- **Category**: {category}
- **Topic Name**: {topic_name}
- **Description**: {description}

Does this topic directly align with our company's purpose?
"""

        try:
            rate_limiter.wait_for_allowance()
            relevance = safe_send_message(chat_session, user_prompt)

            with results_lock:
                results[index] = relevance

            print(f"Zeile {index} klassifiziert: {relevance}")

        except Exception as e:
            with results_lock:
                results[index] = f"Error: {e}"
            print(f"Zeile {index} Fehler: {e}")

        finally:
            task_queue.task_done()

# 替换原token_buckets为rate_limiters
rate_limiters = [RollingWindowRateLimiter(max_requests=1000) for _ in api_keys]

threads = []
for key, limiter in zip(api_keys, rate_limiters):
    for _ in range(WORKERS_PER_KEY):
        t = threading.Thread(target=worker, args=(key, task_queue, results, results_lock, limiter))
        t.start()
        threads.append(t)

备注:内容来源于stack exchange,提问作者Markwardt Stahl

火山引擎 最新活动