You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

使用Python retrying模块时,如何在重试失败后抛出自定义异常?

用Python retrying库应对DDoS保护状态下的下载重试

刚好之前用retrying库处理过类似的重试场景,我来帮你把逻辑理清楚,实现你要的需求:当download(symbol)抛出DDoSProtection异常时自动重试,最多尝试3次(含第一次),每次间隔3秒,如果3次都失败,就抛出自定义的downloadError异常。

首先,我们需要一个辅助函数,用来判断当前抛出的异常是否触发重试规则:

def retry_if_ddospro_error(exception):
    """Return True if we should retry (in this case when it's an DDoSProtection), False otherwise"""
    return isinstance(exception, DDoSProtection)

这个函数的作用很明确——检查异常是否为DDoSProtection类型,是就返回True,告诉retry装饰器“该执行重试了”。

接下来是核心的下载函数,我们给它加上retry装饰器并配置好重试参数:

from retrying import retry

# 先定义你用到的异常类(如果还没定义的话)
class DDoSProtection(Exception):
    pass

class downloadError(Exception):
    pass

@retry(
    retry_on_exception=retry_if_ddospro_error,  # 指定触发重试的异常判断逻辑
    stop_max_attempt_number=3,                 # 最大尝试次数(含首次请求)
    wait_fixed=3000                            # 每次重试前固定等待3000毫秒(3秒)
)
def download(symbol):
    # 执行下载操作,该操作可能抛出DDoSProtection异常
    ls = exchange.fetch_ohlcv(symbol)
    return ls

现在,当exchange.fetch_ohlcv(symbol)抛出DDoSProtection异常时,装饰器会自动触发重试,但这里有个细节:如果3次尝试都失败,装饰器会直接抛出最后一次的DDoSProtection异常,而不是我们想要的downloadError。所以我们需要给下载函数加一层包装,完成异常转换:

def safe_download(symbol):
    try:
        return download(symbol)
    except DDoSProtection:
        raise downloadError('Exchange is in DDoS protection')

这样一来,当3次重试都触发DDoS保护时,就会抛出自定义的downloadError,方便后续的错误日志记录和业务处理。

额外提几个注意点:

  • stop_max_attempt_number=3意味着总共会执行3次下载操作(首次失败后重试2次)
  • 如果你想让等待时间递增(比如指数退避,避免频繁请求加重服务器负担),可以替换wait_fixedwait_exponential_multiplierwait_exponential_max参数
  • 要确保exchange.fetch_ohlcv(symbol)只会在DDoS保护状态下抛出DDoSProtection异常,避免无关异常触发不必要的重试

内容的提问来源于stack exchange,提问作者Florent

火山引擎 最新活动