You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

RabbitMQ多次Nack后触发Timeout Exception的原因排查

分析你的RabbitMQ TimeoutException问题

先梳理下你的场景:你给RabbitMQ队列two-messages设置了长度限制策略,溢出时拒绝发布,然后写了一个带增量退避重试的异步发送方法,运行一段时间后遇到了TimeoutException,同时日志显示捕获PublishNackException后RabbitMQ会关闭model。下面来拆解问题原因和相关疑问:

一、你的配置与代码回顾

1. RabbitMQ队列长度限制策略

你执行的策略命令:

rabbitmqctl set_policy my-pol "^two-messages$" '{"max-length":2,"overflow":"reject-publish"}' --apply-to queues

这个策略是生效的,当队列消息数达到上限时,RabbitMQ会拒绝新的发布请求,触发PublishNackException

2. 异步发送与重试代码

你的异步发送方法(整理后):

public static async Task SendMessagage<T>(Uri queueUri, T message, IBus bus, int interval = 10 * 1000) 
{ 
    try 
    { 
        Task<ISendEndpoint> sendEndpointTask = bus.GetSendEndpoint(queueUri); 
        Console.WriteLine($"Enter SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
        var result = sendEndpointTask.Result; // 这里应该用await,而非直接取Result!
        await result.Send(message); 
        Console.WriteLine($"AFTER SEND SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
        interval = 10 * 1000; 
    } 
    catch (PublishNackException e) 
    { 
        Console.WriteLine($"PublishNack EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
        interval = interval * 2; 
        Thread.Sleep(interval); // 这里的阻塞是关键问题
        await SendMessagage(queueUri, message, bus, interval); 
    } 
    catch(Exception ex) 
    { 
        Console.WriteLine($"EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
        interval = interval * 2; 
        Thread.Sleep(interval); // 同样的阻塞问题
        await SendMessagage(queueUri, message, bus, interval); 
        Console.WriteLine($"EXCEPTION2 SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
    } 
}

3. 遇到的异常与日志

TimeoutException栈信息:

The model usage threw an exception System.TimeoutException: The operation has timed out. 
at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout) 
at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) 
at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) 
at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand) 
at RabbitMQ.Client.Framing.Impl.Connection.CreateModel() 
at System.Threading.Tasks.Task`1.InnerInvoke() 
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) 
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot) 
--- End of stack trace from previous location where exception was thrown --- 
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() 
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.<>c__DisplayClass9_0.<<SendUsingNewModel>b__0>d.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() 
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.ModelScope.<Attach>d__7.MoveNext() 
--- End of stack trace from previous location where exception was thrown --- 
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() 
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) 
at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.<SendUsingExistingModel>d__10.MoveNext()

RabbitMQ model关闭日志:

13/5/2018 16:14:42.594 [Debug] (8) Closing model: 5 / "admin@192.168.153.103:5672/"


二、TimeoutException的根本原因

  1. Model关闭后的资源阻塞问题
    当RabbitMQ触发reject-publish并抛出PublishNackException时,MassTransit会自动关闭当前使用的RabbitMQ model(也就是日志里看到的Closing model),这是正常的资源清理行为。但你的重试逻辑里用了Thread.Sleep(interval),这会阻塞当前异步线程,导致MassTransit无法及时完成模型的销毁和新模型的创建流程。

  2. 异步方法中的同步阻塞错误
    你在异步方法里用了sendEndpointTask.Result直接获取Task结果,这会导致同步阻塞线程,破坏了异步上下文的执行流程。加上后续的Thread.Sleep,会让线程池中的线程被长时间占用,当需要创建新的RabbitMQ model时,RPC请求(比如ChannelOpen)无法及时得到线程资源处理,最终触发超时。

  3. 递归重试的累积问题
    每次捕获异常后递归调用SendMessagage,会导致调用栈不断累积,同时每次重试都会尝试获取新的SendEndpoint,如果线程被阻塞,这些请求会堆积,进一步加剧超时的概率。


三、关于消费者的响应时限

消费者本身没有强制的响应时限,但有几个相关点需要注意:

  • RabbitMQ的channel默认有60秒的心跳机制,如果消费者长时间不发送心跳(比如处理消息耗时远超60秒且没有配置心跳超时),RabbitMQ会认为连接失效并关闭channel,但这和你遇到的生产者端TimeoutException不是一回事。
  • 你的问题根源是生产者重试时的线程阻塞,而非消费者响应超时,但如果消费者处理消息过慢,导致队列频繁触发max-length限制,会间接引发后续的publish reject和重试,所以优化消费者处理速度能从根源减少这类问题。

四、优化建议

  1. 替换Thread.Sleep为Task.Delay
    异步方法中永远不要用Thread.Sleep,改用await Task.Delay(interval),这样不会阻塞线程,让异步上下文正常处理资源管理:

    // 替换Thread.Sleep(interval)为
    await Task.Delay(interval);
    
  2. 用循环代替递归重试
    递归重试会导致调用栈溢出,改用循环结构更安全,也更容易控制重试次数:

    public static async Task SendMessagage<T>(Uri queueUri, T message, IBus bus, int maxRetries = 5) 
    { 
        int interval = 10 * 1000;
        int retryCount = 0;
        
        while(retryCount <= maxRetries)
        {
            try 
            { 
                var sendEndpoint = await bus.GetSendEndpoint(queueUri); 
                Console.WriteLine($"Enter SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
                await sendEndpoint.Send(message); 
                Console.WriteLine($"AFTER SEND SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
                return; // 发送成功,退出循环
            } 
            catch (PublishNackException e) 
            { 
                Console.WriteLine($"PublishNack EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
                if(retryCount >= maxRetries) throw; // 达到最大重试次数,抛出异常
                interval = interval * 2; 
                await Task.Delay(interval); 
                retryCount++;
            } 
            catch(Exception ex) 
            { 
                Console.WriteLine($"EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); 
                if(retryCount >= maxRetries) throw;
                interval = interval * 2; 
                await Task.Delay(interval); 
                retryCount++;
            } 
        }
    }
    
  3. 使用MassTransit自带的重试机制
    MassTransit提供了内置的重试策略,能更优雅地处理异常,并且适配RabbitMQ的资源管理,比如:

    var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host(new Uri("rabbitmq://localhost/"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });
        
        // 配置发送重试:5次重试,初始间隔10秒,每次增加10秒
        cfg.UseRetry(r => r.Incremental(5, 10000, 10000)); 
    });
    
  4. 优化消费者处理速度
    检查消费者的逻辑,是否有耗时操作,考虑增加消费者实例、优化业务逻辑,减少队列堆积的概率,从根源减少publish reject的情况。


内容的提问来源于stack exchange,提问作者Max.Futerman

火山引擎 最新活动