You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

FastAPI结合Redis实现基于用户订阅的动态配额限流全方案咨询

FastAPI结合Redis实现基于用户订阅的动态配额限流全方案咨询

嘿,看起来你已经做了不少基础尝试了,针对你遇到的竞态条件、内存泄漏、分布式同步这些痛点,我给你一套完整的落地方案,一步步解决你的问题:

一、动态配额实时更新:避免竞态+订阅变更即时生效

首先得确保用户订阅变更时,配额能快速生效,同时解决你遇到的竞态问题:

  1. 缓存订阅信息+主动失效
    把用户的订阅级别缓存到Redis,设置合理的过期时间(比如5分钟),当用户更改订阅时,直接更新缓存键,同时主动删除该用户的限流统计键(比如滑动窗口的ZSET),这样下次请求时会重新初始化统计:

    # 获取用户订阅级别(优先从Redis缓存取,缓存失效再查DB)
    async def get_user_subscription(user_id: str) -> str:
        cache_key = f"user:subscription:{user_id}"
        subscription = await redis_client.get(cache_key)
        if not subscription:
            # 从数据库查询真实订阅信息
            subscription = await db_get_user_subscription(user_id)
            # 缓存5分钟,同时允许主动更新
            await redis_client.setex(cache_key, 300, subscription)
        return subscription.decode() if isinstance(subscription, bytes) else subscription
    
    # 当用户订阅变更时调用这个函数(比如后台管理接口)
    async def update_user_subscription(user_id: str, new_subscription: str):
        # 更新数据库
        await db_update_user_subscription(user_id, new_subscription)
        # 更新缓存
        cache_key = f"user:subscription:{user_id}"
        await redis_client.setex(cache_key, 300, new_subscription)
        # 删除用户的限流统计键,强制重新初始化
        await redis_client.delete(f"requests:{user_id}")
    
  2. 原子化配额校验
    把获取配额和限流检查的逻辑放在原子操作里(后面用Lua脚本实现),避免在获取配额和检查限流之间,订阅信息被修改导致的不一致。

二、滑动窗口正确实现:解决内存泄漏+原子性

你的滑动窗口思路是对的,但问题出在没有自动清理长期不活跃用户的ZSET,以及Pipeline不是原子操作。咱们用Lua脚本实现原子化的滑动窗口,同时给ZSET设置过期时间:

编写Redis Lua脚本(原子执行所有步骤)

创建一个sliding_window_limit.lua文件:

local key = KEYS[1]
local quota = tonumber(ARGV[1])
local window_size = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local window_start = now - window_size

-- 1. 添加当前请求时间到有序集合
redis.call('ZADD', key, now, now)
-- 2. 删除窗口外的旧请求记录
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)
-- 3. 设置键的过期时间(窗口大小+10秒,避免刚删完就过期)
redis.call('EXPIRE', key, window_size + 10)
-- 4. 获取当前窗口内的请求数
local count = redis.call('ZCARD', key)
-- 5. 返回是否允许、已用次数、剩余次数
return {count <= quota, count, quota - count}

在FastAPI中调用Lua脚本

import time
from fastapi import Response, HTTPException, Depends

# 加载Lua脚本
with open("sliding_window_limit.lua", "r") as f:
    sliding_window_script = f.read()
    sliding_window_sha = await redis_client.script_load(sliding_window_script)

async def get_rate_limit(user_id: str) -> int:
    QUOTA_MAPPING = {
        'free': 100,
        'premium': 1000,
        'enterprise': 5000
    }
    subscription = await get_user_subscription(user_id)
    return QUOTA_MAPPING.get(subscription, 100)

async def check_sliding_window_rate_limit(user_id: str, window_size: int = 3600) -> tuple[bool, int, int]:
    quota = await get_rate_limit(user_id)
    now = int(time.time())
    key = f"requests:{user_id}"
    # 调用Lua脚本,原子执行所有操作
    result = await redis_client.evalsha(sliding_window_sha, 1, key, quota, window_size, now)
    allowed, used, remaining = result
    return allowed, used, remaining

这样一来:

  • 所有操作原子执行,避免了分布式环境下的竞态
  • 每次操作都会给ZSET设置过期时间,长期不活跃用户的ZSET会自动被Redis删除,解决内存泄漏问题

三、高效处理突发请求:结合令牌桶算法

滑动窗口是平滑限流,但如果需要支持突发请求(比如允许用户短时间内用完部分配额),咱们可以结合令牌桶算法。如果不想用第三方Redis模块(比如Redis-Cell),可以用Lua实现轻量级令牌桶:

令牌桶Lua脚本

local key = KEYS[1]
local capacity = tonumber(ARGV[1])  -- 令牌桶容量(突发上限)
local rate = tonumber(ARGV[2])      -- 每秒生成令牌数
local now = tonumber(ARGV[3])
local request_count = tonumber(ARGV[4]) or 1

-- 获取当前令牌桶状态(最后更新时间、剩余令牌数)
local current = redis.call('HMGET', key, 'last_refill', 'tokens')
local last_refill = tonumber(current[1]) or now
local tokens = tonumber(current[2]) or capacity

-- 计算从上次更新到现在生成的令牌数
local elapsed = now - last_refill
local new_tokens = math.min(capacity, tokens + elapsed * rate)

-- 判断是否足够请求
local allowed = new_tokens >= request_count
if allowed then
    new_tokens = new_tokens - request_count
end

-- 更新令牌桶状态,设置过期时间(比如1小时,避免长期不活跃占用内存)
redis.call('HMSET', key, 'last_refill', now, 'tokens', new_tokens)
redis.call('EXPIRE', key, 3600)

-- 返回结果
return {allowed, new_tokens, capacity}

结合滑动窗口和令牌桶

你可以根据需求选择:

  • 对严格限流的场景用滑动窗口
  • 对需要突发支持的场景用令牌桶
  • 或者双重校验:先过令牌桶(控制突发),再过滑动窗口(控制长期速率)

四、返回剩余配额信息:标准化响应头

在FastAPI接口中,把限流结果放到响应头里,符合HTTP标准:

@app.get("/api/data")
async def get_data(user_id: str = Depends(get_current_user), response: Response = Depends()):
    allowed, used, remaining = await check_sliding_window_rate_limit(user_id)
    quota = await get_rate_limit(user_id)
    # 设置响应头
    response.headers["X-RateLimit-Limit"] = str(quota)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    response.headers["X-RateLimit-Reset"] = str(int(time.time()) + 3600)  # 窗口重置时间
    if not allowed:
        raise HTTPException(status_code=429, detail="请求超过配额限制")
    # 处理业务逻辑
    return {"data": "your data here"}

五、分布式环境适配:确保统一数据源+原子操作

  1. 用Redis集群/哨兵模式:生产环境不要用单节点Redis,改用Redis集群或者哨兵模式,保证高可用和数据一致性。
  2. 所有实例连接同一Redis集群:让所有FastAPI实例都连接同一个Redis集群,这样所有请求的限流统计都统一存储,避免节点间数据不一致。
  3. 全程用Lua脚本:所有限流相关操作都用Lua脚本原子执行,避免分布式环境下的竞态条件,因为Lua脚本在Redis中是单线程执行的,不会被其他请求打断。

额外优化建议

  • 批量清理过期键:如果你的Redis版本支持,可以启用lazyfree-lazy-expire配置,让Redis异步删除过期键,减少性能开销。
  • 监控Redis性能:用Redis的INFO命令或者监控工具(比如Prometheus+Grafana)监控ZSET的数量、内存占用、命令执行时间,及时发现问题。
  • 降级处理:当Redis不可用时,可以临时启用本地内存限流(比如每个实例自己统计),或者返回503服务不可用,避免雪崩。

备注:内容来源于stack exchange,提问作者Nariman Jafari

火山引擎 最新活动