You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在FastAPI中通过SlowAPI实现全局路由速率限制(支持差异化规则配置)

如何在FastAPI中通过SlowAPI实现全局路由速率限制(支持差异化规则配置)

嘿,我懂你不想一个个给接口加@limiter.limit装饰器的麻烦——手动重复加真的很繁琐,尤其是接口多的时候!刚好我之前也折腾过类似的需求,给你梳理下怎么用SlowAPI在FastAPI里实现全局限流,还能灵活配置不同路由、不同用户订阅的差异化规则。

首先先解决你最核心的疑问:SlowAPI本身是通过抛出RateLimitExceeded异常来标记限流触发的,而不是返回布尔值。我们不用自己去写计数和判断逻辑,直接复用它的内置方法就行,这样能省掉很多底层的坑。

第一步:基础全局限流(中间件实现)

先给你一个能直接跑起来的基础版本——用全局HTTP中间件统一处理所有请求的限流,默认给所有接口设一个通用规则:

from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

app = FastAPI()
# 初始化内存版的限流器(符合你不用Redis的要求)
limiter = Limiter(key_func=get_remote_address)

@app.middleware("http")
async def global_rate_limit_middleware(request: Request, call_next):
    # 1. 定义默认的限流规则(比如5次/分钟)
    default_limit = "5/minute"
    
    try:
        # 2. 调用SlowAPI的内置检查方法,传入请求、规则、用户标识(这里用IP)
        await limiter.check(request, default_limit, get_remote_address(request))
        
        # 3. 检查通过,继续处理请求
        response = await call_next(request)
        
        # 可选:给响应头加限流信息,方便客户端查看剩余额度
        limit_details = limiter._get_limit(default_limit)
        remaining = await limiter.storage.get_remaining(
            limiter._get_cache_key(get_remote_address(request), default_limit)
        )
        response.headers.update({
            "X-RateLimit-Limit": str(limit_details),
            "X-RateLimit-Remaining": str(remaining)
        })
        return response
    
    # 4. 捕获限流异常,返回429响应
    except RateLimitExceeded:
        return JSONResponse(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            content={"detail": "请求过于频繁,请稍后再试"}
        )

# 测试用接口
@app.get("/test")
async def test_endpoint():
    return {"message": "请求成功"}

第二步:扩展到差异化规则(路由+用户订阅)

你提到要支持不同路由、不同用户订阅(免费/付费)的差异化规则,这只需要在中间件里根据请求的属性动态匹配对应的规则就行。

比如我们先定义一个规则映射,再结合用户订阅信息来动态选择:

from fastapi import Depends
from typing import Dict

# 假设有一个获取当前用户订阅信息的依赖(你可以替换成自己的认证逻辑)
async def get_current_user_subscription(request: Request) -> Dict:
    # 这里模拟从请求头/Token中获取用户订阅信息
    # 实际项目中可以从JWT、数据库查询等方式获取
    auth_header = request.headers.get("Authorization")
    if auth_header and "premium" in auth_header:
        return {"subscription": "premium", "user_id": "user_123"}
    return {"subscription": "free", "user_id": None}

# 定义路由-规则映射(支持通配符前缀匹配)
route_rate_rules = {
    "/api/free/*": {"free": "5/minute", "premium": "15/minute"},
    "/api/premium/*": {"free": "0/minute", "premium": "30/minute"},  # 免费用户禁止访问付费接口
    "/admin/*": {"free": "10/minute", "premium": "10/minute"},  # 管理员接口统一规则
    "/health": {"free": "100/minute", "premium": "100/minute"}  # 健康检查宽松限流
}

@app.middleware("http")
async def dynamic_rate_limit_middleware(request: Request, call_next):
    path = request.url.path
    user_sub = await get_current_user_subscription(request)
    subscription = user_sub["subscription"]
    # 优先用用户ID作为标识(比IP更准确,适合登录用户),未登录用IP
    user_identifier = user_sub["user_id"] or get_remote_address(request)
    
    # 1. 匹配当前请求对应的规则
    current_limit = "3/minute"  # 兜底规则
    for route_pattern, rules in route_rate_rules.items():
        if path.startswith(route_pattern.rstrip("*")):
            current_limit = rules.get(subscription, current_limit)
            break
    
    # 2. 如果是免费用户访问付费接口,直接返回403
    if path.startswith("/api/premium/") and subscription == "free":
        return JSONResponse(
            status_code=status.HTTP_403_FORBIDDEN,
            content={"detail": "仅付费用户可访问此接口"}
        )
    
    try:
        # 3. 执行限流检查
        await limiter.check(request, current_limit, user_identifier)
        
        response = await call_next(request)
        # 补充响应头信息
        limit_val = limiter._get_limit(current_limit)
        remaining_val = await limiter.storage.get_remaining(
            limiter._get_cache_key(user_identifier, current_limit)
        )
        response.headers.update({
            "X-RateLimit-Limit": str(limit_val),
            "X-RateLimit-Remaining": str(remaining_val),
            "X-RateLimit-Subscription": subscription
        })
        return response
    
    except RateLimitExceeded:
        return JSONResponse(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            content={"detail": f"请求过于频繁,{subscription}用户额度为{current_limit}"}
        )

补充:批量给路由组加限流(不用中间件的另一种方案)

如果你不想用中间件,SlowAPI也支持给整个路由组批量加限流规则,适合按模块划分的接口:

from fastapi import APIRouter

# 定义一个免费接口路由组
free_router = APIRouter(prefix="/api/free")

@free_router.get("/resource1")
async def free_resource1():
    return {"data": "免费资源1"}

@free_router.get("/resource2")
async def free_resource2():
    return {"data": "免费资源2"}

# 给整个路由组批量加限流
limited_free_router = limiter.limit("5/minute")(free_router)
app.include_router(limited_free_router)

# 付费接口路由组同理
premium_router = APIRouter(prefix="/api/premium")
# 可以结合依赖判断用户订阅后再加限流
limited_premium_router = limiter.limit("30/minute")(premium_router)
app.include_router(limited_premium_router, dependencies=[Depends(get_current_user_subscription)])

最后给你确认下可行性

你最开始想的中间件方案是完全可行的,只是SlowAPI没有设计成返回布尔值,而是用异常来处理——这种方式更符合FastAPI/Starlette的异常处理逻辑,也能和框架的全局异常处理器结合。只要你在中间件里正确匹配规则、调用limiter.check,就能实现你要的全局+差异化限流效果。

备注:内容来源于stack exchange,提问作者Jan

火山引擎 最新活动