You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

按类限制线程:.NET Web应用多API调用线程耗尽问题咨询

这是典型的资源饥饿问题——单一类型的慢请求耗尽了全局线程池,导致其他请求无法获取资源。在.NET生态里,我们可以通过「按API类别做并发隔离」来解决,核心就是给不同的API分组分配独立的并发配额,避免某一组拖垮整个系统。下面是几种实用的实现方案:

方案1:使用SemaphoreSlim实现分组并发限制

这是最轻量化的实现方式,适合快速改造。我们可以为每个API类别创建一个独立的SemaphoreSlim实例,限制该类请求的最大并发数,确保某一个API的慢请求不会耗尽所有线程。

实现步骤:

  • 定义一个全局字典,存储每个API对应的信号量,根据业务优先级分配配额(比如全局线程池最大100,我们可以给API-1分配20,API-2分配30,剩下50给其他API或全局共享)
  • 在处理请求前,先获取对应API的信号量,处理完成后释放
// 全局静态类,存储各API的信号量
public static class ApiConcurrencyLimits
{
    private static readonly Dictionary<string, SemaphoreSlim> _semaphores = new()
    {
        ["API-1"] = new SemaphoreSlim(20, 20), // 最多20个并发
        ["API-2"] = new SemaphoreSlim(30, 30),
        ["API-3"] = new SemaphoreSlim(30, 30),
        ["Default"] = new SemaphoreSlim(20, 20) // 兜底的其他API
    };

    public static SemaphoreSlim GetSemaphore(string apiKey)
    {
        return _semaphores.TryGetValue(apiKey, out var semaphore) ? semaphore : _semaphores["Default"];
    }
}

// 在请求处理逻辑中使用
public async Task<IActionResult> HandleRequest(string apiType)
{
    var semaphore = ApiConcurrencyLimits.GetSemaphore(apiType);
    try
    {
        // 等待获取信号量,设置超时避免无限等待
        if (!await semaphore.WaitAsync(TimeSpan.FromSeconds(5)))
        {
            return StatusCode(StatusCodes.Status503ServiceUnavailable, "系统繁忙,请稍后再试");
        }

        // 调用对应的API并处理逻辑
        var response = await CallExternalApi(apiType);
        return Ok(response);
    }
    finally
    {
        semaphore.Release();
    }
}

方案2:使用自定义TaskScheduler实现独立线程调度

如果需要更严格的线程隔离(比如不想和全局线程池共享资源),可以给每个API类别创建自定义的任务调度器,限制并发执行的任务数,相当于给每个API分配了独立的“小线程池”。

实现示例:

// 自定义并发限制的任务调度器
public class LimitedConcurrencyTaskScheduler : TaskScheduler
{
    private readonly Queue<Task> _tasks = new Queue<Task>();
    private readonly int _maxConcurrency;
    private int _currentConcurrency;
    private readonly object _lock = new object();

    public LimitedConcurrencyTaskScheduler(int maxConcurrency)
    {
        _maxConcurrency = maxConcurrency;
    }

    protected override void QueueTask(Task task)
    {
        lock (_lock)
        {
            _tasks.Enqueue(task);
            if (_currentConcurrency < _maxConcurrency)
            {
                _currentConcurrency++;
                TryExecuteTask(task);
            }
        }
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false; // 不允许内联执行,保证隔离性
    }

    protected override IEnumerable<Task>? GetScheduledTasks()
    {
        lock (_lock)
        {
            return _tasks.ToList();
        }
    }

    protected override bool TryExecuteTask(Task task)
    {
        try
        {
            base.TryExecuteTask(task);
        }
        finally
        {
            lock (_lock)
            {
                _currentConcurrency--;
                if (_tasks.Count > 0)
                {
                    var nextTask = _tasks.Dequeue();
                    _currentConcurrency++;
                    TryExecuteTask(nextTask);
                }
            }
        }
        return true;
    }
}

// 全局存储各API的调度器
public static class ApiTaskSchedulers
{
    public static readonly Dictionary<string, TaskScheduler> Schedulers = new()
    {
        ["API-1"] = new LimitedConcurrencyTaskScheduler(20),
        ["API-2"] = new LimitedConcurrencyTaskScheduler(30),
        ["API-3"] = new LimitedConcurrencyTaskScheduler(30)
    };
}

// 使用方式
public async Task<IActionResult> HandleRequest(string apiType)
{
    if (!ApiTaskSchedulers.Schedulers.TryGetValue(apiType, out var scheduler))
    {
        scheduler = TaskScheduler.Default;
    }

    // 使用指定调度器执行API调用任务
    var response = await Task.Factory.StartNew(async () =>
    {
        return await CallExternalApi(apiType);
    }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler).Unwrap();

    return Ok(response);
}

方案3:使用.NET 7+自带的RateLimiter组件(推荐)

如果你的项目是.NET 7及以上版本,官方提供的RateLimiter组件是更优雅的选择,它原生支持多种限流策略,包括并发限制,而且可以轻松集成到ASP.NET Core的请求管道中。

实现步骤:

  1. 注册不同API组的限流器
builder.Services.AddRateLimiter(options =>
{
    // 给API-1设置并发限制
    options.AddConcurrencyLimiter("API-1", limiterOptions =>
    {
        limiterOptions.PermitLimit = 20;
        limiterOptions.QueueLimit = 10; // 等待队列长度
        limiterOptions.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
    });

    // 给API-2设置并发限制
    options.AddConcurrencyLimiter("API-2", limiterOptions =>
    {
        limiterOptions.PermitLimit = 30;
        limiterOptions.QueueLimit = 15;
    });

    // 兜底的全局限流器
    options.AddConcurrencyLimiter("Default", limiterOptions =>
    {
        limiterOptions.PermitLimit = 20;
        limiterOptions.QueueLimit = 5;
    });
});
  1. 在中间件中根据请求选择对应的限流器
app.UseRateLimiter();

app.Map("/api/*", appBuilder =>
{
    appBuilder.Use(async (context, next) =>
    {
        // 根据请求路径或标识判断API类型
        string apiType = context.Request.Path.StartsWithSegments("/api/api1") ? "API-1" :
                         context.Request.Path.StartsWithSegments("/api/api2") ? "API-2" :
                         "Default";

        // 应用对应的限流器
        context.Items["RateLimiterPolicy"] = apiType;
        await next();
    }).RequireRateLimiter(context => context.Items["RateLimiterPolicy"]?.ToString() ?? "Default");
});

关键注意事项

  • 配额分配要贴合业务:核心API可以分配更多并发配额,非核心API少分配,避免资源浪费
  • 处理超时和队列溢出:不管用哪种方案,都要设置合理的等待超时和队列长度,避免请求无限等待导致资源泄漏
  • 监控和调优:实时监控每个API组的并发数、等待队列长度、响应时间,根据实际运行情况调整配额

这样改造后,即使API-1的响应时长突然增加,最多只会占满它自己分配的20个线程,其他API的请求依然可以正常获取线程资源,不会导致整个系统瘫痪。

内容的提问来源于stack exchange,提问作者shubhamr

火山引擎 最新活动