如何在Windows服务中集成RabbitMQ,实现基于事件触发的业务逻辑执行(以MapUpdated事件触发下载为例)
如何在Windows服务中集成RabbitMQ,实现基于事件触发的业务逻辑执行(以MapUpdated事件触发下载为例)
我完全理解你的需求——用RabbitMQ的发布/订阅模式,让你的Windows服务只在收到MapUpdated事件时才执行下载逻辑,而且要适配Windows服务的长期后台运行场景,不需要请求响应式的交互。下面我一步步给你拆解落地方案:
整体思路
我们会用RabbitMQ的**Topic Exchange(主题交换机)**实现事件路由:
- 事件生产者(比如负责地图更新的Windows服务)在地图更新完成时,将
MapUpdated事件发布到RabbitMQ的主题交换机 - 你的下载服务作为消费者,绑定专属队列到交换机,监听
MapUpdated事件 - 消费者收到事件后,异步触发
DownloadInfoAsync逻辑,全程不需要同步等待响应
第一步:准备依赖
首先安装RabbitMQ的官方.NET客户端库,通过NuGet安装:
Install-Package RabbitMQ.Client
同时确保你的项目已经引入Microsoft.Extensions.Hosting(Windows服务的基础框架)。
第二步:封装RabbitMQ基础组件
为了让Windows服务长期稳定运行,我们需要封装RabbitMQ的连接、通道(避免频繁创建销毁连接),并注册为DI单例:
1. RabbitMQ配置类
先定义配置项,方便从appsettings.json读取:
public class RabbitMqSettings { public string HostName { get; set; } = "localhost"; public string UserName { get; set; } = "guest"; public string Password { get; set; } = "guest"; public string EventExchangeName { get; set; } = "ServiceEventExchange"; public string MapUpdatedRoutingKey { get; set; } = "event.map.updated"; public string DownloadServiceQueueName { get; set; } = "DownloadService.MapUpdated.Queue"; }
2. DI注入RabbitMQ连接
在服务注册时添加RabbitMQ的单例连接和工厂:
public static class RabbitMqServiceExtensions { public static IServiceCollection AddRabbitMq(this IServiceCollection services, IConfiguration config) { var settings = config.GetSection("RabbitMq").Get<RabbitMqSettings>(); services.AddSingleton(settings); // 注册连接工厂 services.AddSingleton<IConnectionFactory>(_ => new ConnectionFactory { HostName = settings.HostName, UserName = settings.UserName, Password = settings.Password, DispatchConsumersAsync = true, // 支持异步消费者 RequestedHeartbeat = TimeSpan.FromSeconds(30) // 保持连接心跳 }); // 注册单例连接(长期运行服务推荐复用一个连接) services.AddSingleton<IConnection>(sp => { var factory = sp.GetRequiredService<IConnectionFactory>(); var connection = factory.CreateConnection(); // 监听连接断开事件,便于日志和重连 connection.ConnectionShutdown += (_, args) => { sp.GetRequiredService<ILogger<RabbitMqSettings>>() .LogWarning("RabbitMQ连接断开: {Reason}", args.ReplyText); }; return connection; }); return services; } }
第三步:实现事件生产者(发布MapUpdated事件)
假设你有一个负责地图更新的Windows服务,当地图更新完成时,发布事件到RabbitMQ:
public class MapEventPublisher { private readonly IConnection _rabbitMqConn; private readonly RabbitMqSettings _settings; private readonly ILogger<MapEventPublisher> _logger; public MapEventPublisher(IConnection rabbitMqConn, RabbitMqSettings settings, ILogger<MapEventPublisher> logger) { _rabbitMqConn = rabbitMqConn; _settings = settings; _logger = logger; } public async Task PublishMapUpdatedAsync(string mapId = "", CancellationToken ct = default) { try { using var channel = _rabbitMqConn.CreateModel(); // 声明持久化交换机(确保RabbitMQ重启后不丢失) channel.ExchangeDeclare( exchange: _settings.EventExchangeName, type: ExchangeType.Topic, durable: true); // 事件携带数据(比如地图ID),用JSON序列化 var eventData = JsonSerializer.Serialize(new { MapId = mapId, UpdatedAt = DateTime.UtcNow }); var body = Encoding.UTF8.GetBytes(eventData); // 发布持久化消息 var props = channel.CreateBasicProperties(); props.Persistent = true; await Task.Run(() => channel.BasicPublish( exchange: _settings.EventExchangeName, routingKey: _settings.MapUpdatedRoutingKey, basicProperties: props, body: body), ct); _logger.LogInformation("已发布MapUpdated事件,地图ID: {MapId}", mapId); } catch (Exception ex) { _logger.LogError(ex, "发布MapUpdated事件失败"); throw; } } }
第四步:修改你的下载服务为RabbitMQ消费者
现在把你原来的WindowsBackgroundService改成事件驱动的消费者,只有收到MapUpdated事件时才执行下载:
1. 事件消息处理器
先把下载逻辑和RabbitMQ解耦,单独做一个处理器:
public class MapUpdatedEventHandler { private readonly DownloadService _downloadService; private readonly ILogger<MapUpdatedEventHandler> _logger; public MapUpdatedEventHandler(DownloadService downloadService, ILogger<MapUpdatedEventHandler> logger) { _downloadService = downloadService; _logger = logger; } public async Task HandleAsync(byte[] messageBody, CancellationToken ct) { try { // 反序列化事件数据(如果有) var eventData = JsonSerializer.Deserialize<MapUpdatedEvent>(messageBody); _logger.LogInformation("收到MapUpdated事件,开始下载地图: {MapId}", eventData?.MapId); // 触发下载逻辑 await _downloadService.DownloadInfoAsync(ct); _logger.LogInformation("地图下载完成(触发自MapUpdated事件)"); } catch (Exception ex) { _logger.LogError(ex, "处理MapUpdated事件失败"); // 可根据需求配置:抛出异常让RabbitMQ重新投递,或者转死信队列 } } // 事件数据模型 private class MapUpdatedEvent { public string? MapId { get; set; } public DateTime UpdatedAt { get; set; } } }
2. 修改WindowsBackgroundService
让它在后台持续监听RabbitMQ消息,处理重连逻辑:
public sealed class WindowsBackgroundService( IConnection rabbitMqConn, RabbitMqSettings settings, MapUpdatedEventHandler eventHandler, ILogger<WindowsBackgroundService> logger) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("启动RabbitMQ消费者,监听MapUpdated事件"); // 重连循环:连接断开后自动重试 while (!stoppingToken.IsCancellationRequested) { try { using var channel = rabbitMqConn.CreateModel(); // 声明队列和绑定关系 channel.QueueDeclare( queue: settings.DownloadServiceQueueName, durable: true, // 持久化队列 exclusive: false, autoDelete: false, arguments: null); channel.QueueBind( queue: settings.DownloadServiceQueueName, exchange: settings.EventExchangeName, routingKey: settings.MapUpdatedRoutingKey); // 配置QoS:每次只处理1条消息,避免压垮服务 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 异步消费者 var consumer = new AsyncEventingBasicConsumer(channel); consumer.Received += async (_, ea) => { try { await eventHandler.HandleAsync(ea.Body.ToArray(), stoppingToken); // 手动确认消息:只有处理成功才通知RabbitMQ删除 channel.BasicAck(ea.DeliveryTag, multiple: false); } catch (Exception ex) { logger.LogError(ex, "处理消息失败,拒绝消息"); // 拒绝消息,不重新入队(可根据需求调整为requeue: true) channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false); } }; // 开始监听队列 channel.BasicConsume( queue: settings.DownloadServiceQueueName, autoAck: false, // 禁用自动确认,确保消息不丢失 consumer: consumer); logger.LogInformation("RabbitMQ消费者已就绪,等待MapUpdated事件"); // 阻塞直到服务停止 await Task.Delay(Timeout.Infinite, stoppingToken); } catch (OperationCanceledException) { logger.LogInformation("服务正在停止,关闭RabbitMQ消费者"); break; } catch (Exception ex) { logger.LogError(ex, "RabbitMQ消费者异常,5秒后重试"); await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } } }
第五步:注册所有服务
在Program.cs中完成DI注册,配置Windows服务:
var host = Host.CreateDefaultBuilder(args) .UseWindowsService() // 标记为Windows服务 .ConfigureServices((context, services) => { // 注册RabbitMQ services.AddRabbitMq(context.Configuration); // 注册事件处理器和业务服务 services.AddScoped<MapUpdatedEventHandler>(); services.AddScoped<DownloadService>(); // 注册Windows服务 services.AddHostedService<WindowsBackgroundService>(); // 注册事件生产者(如果当前服务也需要发布事件的话) // services.AddScoped<MapEventPublisher>(); }) .Build(); await host.RunAsync();
关键注意事项
- 持久化配置:Exchange、Queue、Message都设置为持久化(代码中已配置),确保RabbitMQ重启后不丢失数据
- 重连逻辑:Windows服务长期运行,必须处理RabbitMQ连接断开的情况,代码中用循环+延迟重试实现
- 消息确认:使用手动确认机制,只有当下载逻辑成功完成后才删除消息,避免消息丢失
- 错误处理:处理消息时的异常要单独捕获,避免单个消息的错误导致整个消费者崩溃
- 序列化:如果事件需要携带业务数据,用JSON/Protobuf序列化,不要直接传字符串
内容来源于stack exchange




