如何在FastAPI中通过SlowAPI实现全局路由速率限制(支持差异化规则配置)
如何在FastAPI中通过SlowAPI实现全局路由速率限制(支持差异化规则配置)
嘿,我懂你不想一个个给接口加@limiter.limit装饰器的麻烦——手动重复加真的很繁琐,尤其是接口多的时候!刚好我之前也折腾过类似的需求,给你梳理下怎么用SlowAPI在FastAPI里实现全局限流,还能灵活配置不同路由、不同用户订阅的差异化规则。
首先先解决你最核心的疑问:SlowAPI本身是通过抛出RateLimitExceeded异常来标记限流触发的,而不是返回布尔值。我们不用自己去写计数和判断逻辑,直接复用它的内置方法就行,这样能省掉很多底层的坑。
第一步:基础全局限流(中间件实现)
先给你一个能直接跑起来的基础版本——用全局HTTP中间件统一处理所有请求的限流,默认给所有接口设一个通用规则:
from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded app = FastAPI() # 初始化内存版的限流器(符合你不用Redis的要求) limiter = Limiter(key_func=get_remote_address) @app.middleware("http") async def global_rate_limit_middleware(request: Request, call_next): # 1. 定义默认的限流规则(比如5次/分钟) default_limit = "5/minute" try: # 2. 调用SlowAPI的内置检查方法,传入请求、规则、用户标识(这里用IP) await limiter.check(request, default_limit, get_remote_address(request)) # 3. 检查通过,继续处理请求 response = await call_next(request) # 可选:给响应头加限流信息,方便客户端查看剩余额度 limit_details = limiter._get_limit(default_limit) remaining = await limiter.storage.get_remaining( limiter._get_cache_key(get_remote_address(request), default_limit) ) response.headers.update({ "X-RateLimit-Limit": str(limit_details), "X-RateLimit-Remaining": str(remaining) }) return response # 4. 捕获限流异常,返回429响应 except RateLimitExceeded: return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": "请求过于频繁,请稍后再试"} ) # 测试用接口 @app.get("/test") async def test_endpoint(): return {"message": "请求成功"}
第二步:扩展到差异化规则(路由+用户订阅)
你提到要支持不同路由、不同用户订阅(免费/付费)的差异化规则,这只需要在中间件里根据请求的属性动态匹配对应的规则就行。
比如我们先定义一个规则映射,再结合用户订阅信息来动态选择:
from fastapi import Depends from typing import Dict # 假设有一个获取当前用户订阅信息的依赖(你可以替换成自己的认证逻辑) async def get_current_user_subscription(request: Request) -> Dict: # 这里模拟从请求头/Token中获取用户订阅信息 # 实际项目中可以从JWT、数据库查询等方式获取 auth_header = request.headers.get("Authorization") if auth_header and "premium" in auth_header: return {"subscription": "premium", "user_id": "user_123"} return {"subscription": "free", "user_id": None} # 定义路由-规则映射(支持通配符前缀匹配) route_rate_rules = { "/api/free/*": {"free": "5/minute", "premium": "15/minute"}, "/api/premium/*": {"free": "0/minute", "premium": "30/minute"}, # 免费用户禁止访问付费接口 "/admin/*": {"free": "10/minute", "premium": "10/minute"}, # 管理员接口统一规则 "/health": {"free": "100/minute", "premium": "100/minute"} # 健康检查宽松限流 } @app.middleware("http") async def dynamic_rate_limit_middleware(request: Request, call_next): path = request.url.path user_sub = await get_current_user_subscription(request) subscription = user_sub["subscription"] # 优先用用户ID作为标识(比IP更准确,适合登录用户),未登录用IP user_identifier = user_sub["user_id"] or get_remote_address(request) # 1. 匹配当前请求对应的规则 current_limit = "3/minute" # 兜底规则 for route_pattern, rules in route_rate_rules.items(): if path.startswith(route_pattern.rstrip("*")): current_limit = rules.get(subscription, current_limit) break # 2. 如果是免费用户访问付费接口,直接返回403 if path.startswith("/api/premium/") and subscription == "free": return JSONResponse( status_code=status.HTTP_403_FORBIDDEN, content={"detail": "仅付费用户可访问此接口"} ) try: # 3. 执行限流检查 await limiter.check(request, current_limit, user_identifier) response = await call_next(request) # 补充响应头信息 limit_val = limiter._get_limit(current_limit) remaining_val = await limiter.storage.get_remaining( limiter._get_cache_key(user_identifier, current_limit) ) response.headers.update({ "X-RateLimit-Limit": str(limit_val), "X-RateLimit-Remaining": str(remaining_val), "X-RateLimit-Subscription": subscription }) return response except RateLimitExceeded: return JSONResponse( status_code=status.HTTP_429_TOO_MANY_REQUESTS, content={"detail": f"请求过于频繁,{subscription}用户额度为{current_limit}"} )
补充:批量给路由组加限流(不用中间件的另一种方案)
如果你不想用中间件,SlowAPI也支持给整个路由组批量加限流规则,适合按模块划分的接口:
from fastapi import APIRouter # 定义一个免费接口路由组 free_router = APIRouter(prefix="/api/free") @free_router.get("/resource1") async def free_resource1(): return {"data": "免费资源1"} @free_router.get("/resource2") async def free_resource2(): return {"data": "免费资源2"} # 给整个路由组批量加限流 limited_free_router = limiter.limit("5/minute")(free_router) app.include_router(limited_free_router) # 付费接口路由组同理 premium_router = APIRouter(prefix="/api/premium") # 可以结合依赖判断用户订阅后再加限流 limited_premium_router = limiter.limit("30/minute")(premium_router) app.include_router(limited_premium_router, dependencies=[Depends(get_current_user_subscription)])
最后给你确认下可行性
你最开始想的中间件方案是完全可行的,只是SlowAPI没有设计成返回布尔值,而是用异常来处理——这种方式更符合FastAPI/Starlette的异常处理逻辑,也能和框架的全局异常处理器结合。只要你在中间件里正确匹配规则、调用limiter.check,就能实现你要的全局+差异化限流效果。
备注:内容来源于stack exchange,提问作者Jan




