RabbitMQ多次Nack后触发Timeout Exception的原因排查
先梳理下你的场景:你给RabbitMQ队列two-messages设置了长度限制策略,溢出时拒绝发布,然后写了一个带增量退避重试的异步发送方法,运行一段时间后遇到了TimeoutException,同时日志显示捕获PublishNackException后RabbitMQ会关闭model。下面来拆解问题原因和相关疑问:
一、你的配置与代码回顾
1. RabbitMQ队列长度限制策略
你执行的策略命令:
rabbitmqctl set_policy my-pol "^two-messages$" '{"max-length":2,"overflow":"reject-publish"}' --apply-to queues
这个策略是生效的,当队列消息数达到上限时,RabbitMQ会拒绝新的发布请求,触发PublishNackException。
2. 异步发送与重试代码
你的异步发送方法(整理后):
public static async Task SendMessagage<T>(Uri queueUri, T message, IBus bus, int interval = 10 * 1000) { try { Task<ISendEndpoint> sendEndpointTask = bus.GetSendEndpoint(queueUri); Console.WriteLine($"Enter SendMessagage {queueUri.AbsolutePath} interval={interval}"); var result = sendEndpointTask.Result; // 这里应该用await,而非直接取Result! await result.Send(message); Console.WriteLine($"AFTER SEND SendMessagage {queueUri.AbsolutePath} interval={interval}"); interval = 10 * 1000; } catch (PublishNackException e) { Console.WriteLine($"PublishNack EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); interval = interval * 2; Thread.Sleep(interval); // 这里的阻塞是关键问题 await SendMessagage(queueUri, message, bus, interval); } catch(Exception ex) { Console.WriteLine($"EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); interval = interval * 2; Thread.Sleep(interval); // 同样的阻塞问题 await SendMessagage(queueUri, message, bus, interval); Console.WriteLine($"EXCEPTION2 SendMessagage {queueUri.AbsolutePath} interval={interval}"); } }
3. 遇到的异常与日志
TimeoutException栈信息:
The model usage threw an exception System.TimeoutException: The operation has timed out. at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout) at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout) at RabbitMQ.Client.Impl.ModelBase.ModelRpc(MethodBase method, ContentHeaderBase header, Byte[] body) at RabbitMQ.Client.Framing.Impl.Model._Private_ChannelOpen(String outOfBand) at RabbitMQ.Client.Framing.Impl.Connection.CreateModel() at System.Threading.Tasks.Task`1.InnerInvoke() at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot) --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.<>c__DisplayClass9_0.<<SendUsingNewModel>b__0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.ModelScope.<Attach>d__7.MoveNext() --- End of stack trace from previous location where exception was thrown --- at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at MassTransit.RabbitMqTransport.Integration.RabbitMqModelCache.<SendUsingExistingModel>d__10.MoveNext()
RabbitMQ model关闭日志:
13/5/2018 16:14:42.594 [Debug] (8) Closing model: 5 / "admin@192.168.153.103:5672/"
二、TimeoutException的根本原因
Model关闭后的资源阻塞问题:
当RabbitMQ触发reject-publish并抛出PublishNackException时,MassTransit会自动关闭当前使用的RabbitMQ model(也就是日志里看到的Closing model),这是正常的资源清理行为。但你的重试逻辑里用了Thread.Sleep(interval),这会阻塞当前异步线程,导致MassTransit无法及时完成模型的销毁和新模型的创建流程。异步方法中的同步阻塞错误:
你在异步方法里用了sendEndpointTask.Result直接获取Task结果,这会导致同步阻塞线程,破坏了异步上下文的执行流程。加上后续的Thread.Sleep,会让线程池中的线程被长时间占用,当需要创建新的RabbitMQ model时,RPC请求(比如ChannelOpen)无法及时得到线程资源处理,最终触发超时。递归重试的累积问题:
每次捕获异常后递归调用SendMessagage,会导致调用栈不断累积,同时每次重试都会尝试获取新的SendEndpoint,如果线程被阻塞,这些请求会堆积,进一步加剧超时的概率。
三、关于消费者的响应时限
消费者本身没有强制的响应时限,但有几个相关点需要注意:
- RabbitMQ的channel默认有60秒的心跳机制,如果消费者长时间不发送心跳(比如处理消息耗时远超60秒且没有配置心跳超时),RabbitMQ会认为连接失效并关闭channel,但这和你遇到的生产者端TimeoutException不是一回事。
- 你的问题根源是生产者重试时的线程阻塞,而非消费者响应超时,但如果消费者处理消息过慢,导致队列频繁触发
max-length限制,会间接引发后续的publish reject和重试,所以优化消费者处理速度能从根源减少这类问题。
四、优化建议
替换Thread.Sleep为Task.Delay:
异步方法中永远不要用Thread.Sleep,改用await Task.Delay(interval),这样不会阻塞线程,让异步上下文正常处理资源管理:// 替换Thread.Sleep(interval)为 await Task.Delay(interval);用循环代替递归重试:
递归重试会导致调用栈溢出,改用循环结构更安全,也更容易控制重试次数:public static async Task SendMessagage<T>(Uri queueUri, T message, IBus bus, int maxRetries = 5) { int interval = 10 * 1000; int retryCount = 0; while(retryCount <= maxRetries) { try { var sendEndpoint = await bus.GetSendEndpoint(queueUri); Console.WriteLine($"Enter SendMessagage {queueUri.AbsolutePath} interval={interval}"); await sendEndpoint.Send(message); Console.WriteLine($"AFTER SEND SendMessagage {queueUri.AbsolutePath} interval={interval}"); return; // 发送成功,退出循环 } catch (PublishNackException e) { Console.WriteLine($"PublishNack EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); if(retryCount >= maxRetries) throw; // 达到最大重试次数,抛出异常 interval = interval * 2; await Task.Delay(interval); retryCount++; } catch(Exception ex) { Console.WriteLine($"EXCEPTION SendMessagage {queueUri.AbsolutePath} interval={interval}"); if(retryCount >= maxRetries) throw; interval = interval * 2; await Task.Delay(interval); retryCount++; } } }使用MassTransit自带的重试机制:
MassTransit提供了内置的重试策略,能更优雅地处理异常,并且适配RabbitMQ的资源管理,比如:var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { cfg.Host(new Uri("rabbitmq://localhost/"), h => { h.Username("guest"); h.Password("guest"); }); // 配置发送重试:5次重试,初始间隔10秒,每次增加10秒 cfg.UseRetry(r => r.Incremental(5, 10000, 10000)); });优化消费者处理速度:
检查消费者的逻辑,是否有耗时操作,考虑增加消费者实例、优化业务逻辑,减少队列堆积的概率,从根源减少publish reject的情况。
内容的提问来源于stack exchange,提问作者Max.Futerman




