You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在Windows服务中集成RabbitMQ,实现基于事件触发的业务逻辑执行(以MapUpdated事件触发下载为例)

如何在Windows服务中集成RabbitMQ,实现基于事件触发的业务逻辑执行(以MapUpdated事件触发下载为例)

我完全理解你的需求——用RabbitMQ的发布/订阅模式,让你的Windows服务只在收到MapUpdated事件时才执行下载逻辑,而且要适配Windows服务的长期后台运行场景,不需要请求响应式的交互。下面我一步步给你拆解落地方案:


整体思路

我们会用RabbitMQ的**Topic Exchange(主题交换机)**实现事件路由:

  1. 事件生产者(比如负责地图更新的Windows服务)在地图更新完成时,将MapUpdated事件发布到RabbitMQ的主题交换机
  2. 你的下载服务作为消费者,绑定专属队列到交换机,监听MapUpdated事件
  3. 消费者收到事件后,异步触发DownloadInfoAsync逻辑,全程不需要同步等待响应

第一步:准备依赖

首先安装RabbitMQ的官方.NET客户端库,通过NuGet安装:

Install-Package RabbitMQ.Client

同时确保你的项目已经引入Microsoft.Extensions.Hosting(Windows服务的基础框架)。


第二步:封装RabbitMQ基础组件

为了让Windows服务长期稳定运行,我们需要封装RabbitMQ的连接、通道(避免频繁创建销毁连接),并注册为DI单例:

1. RabbitMQ配置类

先定义配置项,方便从appsettings.json读取:

public class RabbitMqSettings
{
    public string HostName { get; set; } = "localhost";
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string EventExchangeName { get; set; } = "ServiceEventExchange";
    public string MapUpdatedRoutingKey { get; set; } = "event.map.updated";
    public string DownloadServiceQueueName { get; set; } = "DownloadService.MapUpdated.Queue";
}

2. DI注入RabbitMQ连接

在服务注册时添加RabbitMQ的单例连接和工厂:

public static class RabbitMqServiceExtensions
{
    public static IServiceCollection AddRabbitMq(this IServiceCollection services, IConfiguration config)
    {
        var settings = config.GetSection("RabbitMq").Get<RabbitMqSettings>();
        services.AddSingleton(settings);

        // 注册连接工厂
        services.AddSingleton<IConnectionFactory>(_ => new ConnectionFactory
        {
            HostName = settings.HostName,
            UserName = settings.UserName,
            Password = settings.Password,
            DispatchConsumersAsync = true, // 支持异步消费者
            RequestedHeartbeat = TimeSpan.FromSeconds(30) // 保持连接心跳
        });

        // 注册单例连接(长期运行服务推荐复用一个连接)
        services.AddSingleton<IConnection>(sp =>
        {
            var factory = sp.GetRequiredService<IConnectionFactory>();
            var connection = factory.CreateConnection();
            
            // 监听连接断开事件,便于日志和重连
            connection.ConnectionShutdown += (_, args) =>
            {
                sp.GetRequiredService<ILogger<RabbitMqSettings>>()
                  .LogWarning("RabbitMQ连接断开: {Reason}", args.ReplyText);
            };
            return connection;
        });

        return services;
    }
}

第三步:实现事件生产者(发布MapUpdated事件)

假设你有一个负责地图更新的Windows服务,当地图更新完成时,发布事件到RabbitMQ:

public class MapEventPublisher
{
    private readonly IConnection _rabbitMqConn;
    private readonly RabbitMqSettings _settings;
    private readonly ILogger<MapEventPublisher> _logger;

    public MapEventPublisher(IConnection rabbitMqConn, RabbitMqSettings settings, ILogger<MapEventPublisher> logger)
    {
        _rabbitMqConn = rabbitMqConn;
        _settings = settings;
        _logger = logger;
    }

    public async Task PublishMapUpdatedAsync(string mapId = "", CancellationToken ct = default)
    {
        try
        {
            using var channel = _rabbitMqConn.CreateModel();
            // 声明持久化交换机(确保RabbitMQ重启后不丢失)
            channel.ExchangeDeclare(
                exchange: _settings.EventExchangeName,
                type: ExchangeType.Topic,
                durable: true);

            // 事件携带数据(比如地图ID),用JSON序列化
            var eventData = JsonSerializer.Serialize(new { MapId = mapId, UpdatedAt = DateTime.UtcNow });
            var body = Encoding.UTF8.GetBytes(eventData);

            // 发布持久化消息
            var props = channel.CreateBasicProperties();
            props.Persistent = true;

            await Task.Run(() =>
                channel.BasicPublish(
                    exchange: _settings.EventExchangeName,
                    routingKey: _settings.MapUpdatedRoutingKey,
                    basicProperties: props,
                    body: body), ct);

            _logger.LogInformation("已发布MapUpdated事件,地图ID: {MapId}", mapId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "发布MapUpdated事件失败");
            throw;
        }
    }
}

第四步:修改你的下载服务为RabbitMQ消费者

现在把你原来的WindowsBackgroundService改成事件驱动的消费者,只有收到MapUpdated事件时才执行下载:

1. 事件消息处理器

先把下载逻辑和RabbitMQ解耦,单独做一个处理器:

public class MapUpdatedEventHandler
{
    private readonly DownloadService _downloadService;
    private readonly ILogger<MapUpdatedEventHandler> _logger;

    public MapUpdatedEventHandler(DownloadService downloadService, ILogger<MapUpdatedEventHandler> logger)
    {
        _downloadService = downloadService;
        _logger = logger;
    }

    public async Task HandleAsync(byte[] messageBody, CancellationToken ct)
    {
        try
        {
            // 反序列化事件数据(如果有)
            var eventData = JsonSerializer.Deserialize<MapUpdatedEvent>(messageBody);
            _logger.LogInformation("收到MapUpdated事件,开始下载地图: {MapId}", eventData?.MapId);

            // 触发下载逻辑
            await _downloadService.DownloadInfoAsync(ct);
            
            _logger.LogInformation("地图下载完成(触发自MapUpdated事件)");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "处理MapUpdated事件失败");
            // 可根据需求配置:抛出异常让RabbitMQ重新投递,或者转死信队列
        }
    }

    // 事件数据模型
    private class MapUpdatedEvent
    {
        public string? MapId { get; set; }
        public DateTime UpdatedAt { get; set; }
    }
}

2. 修改WindowsBackgroundService

让它在后台持续监听RabbitMQ消息,处理重连逻辑:

public sealed class WindowsBackgroundService(
    IConnection rabbitMqConn,
    RabbitMqSettings settings,
    MapUpdatedEventHandler eventHandler,
    ILogger<WindowsBackgroundService> logger) 
    : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("启动RabbitMQ消费者,监听MapUpdated事件");

        // 重连循环:连接断开后自动重试
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                using var channel = rabbitMqConn.CreateModel();
                // 声明队列和绑定关系
                channel.QueueDeclare(
                    queue: settings.DownloadServiceQueueName,
                    durable: true, // 持久化队列
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                channel.QueueBind(
                    queue: settings.DownloadServiceQueueName,
                    exchange: settings.EventExchangeName,
                    routingKey: settings.MapUpdatedRoutingKey);

                // 配置QoS:每次只处理1条消息,避免压垮服务
                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                // 异步消费者
                var consumer = new AsyncEventingBasicConsumer(channel);
                consumer.Received += async (_, ea) =>
                {
                    try
                    {
                        await eventHandler.HandleAsync(ea.Body.ToArray(), stoppingToken);
                        // 手动确认消息:只有处理成功才通知RabbitMQ删除
                        channel.BasicAck(ea.DeliveryTag, multiple: false);
                    }
                    catch (Exception ex)
                    {
                        logger.LogError(ex, "处理消息失败,拒绝消息");
                        // 拒绝消息,不重新入队(可根据需求调整为requeue: true)
                        channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
                    }
                };

                // 开始监听队列
                channel.BasicConsume(
                    queue: settings.DownloadServiceQueueName,
                    autoAck: false, // 禁用自动确认,确保消息不丢失
                    consumer: consumer);

                logger.LogInformation("RabbitMQ消费者已就绪,等待MapUpdated事件");
                // 阻塞直到服务停止
                await Task.Delay(Timeout.Infinite, stoppingToken);
            }
            catch (OperationCanceledException)
            {
                logger.LogInformation("服务正在停止,关闭RabbitMQ消费者");
                break;
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "RabbitMQ消费者异常,5秒后重试");
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }
        }
    }
}

第五步:注册所有服务

Program.cs中完成DI注册,配置Windows服务:

var host = Host.CreateDefaultBuilder(args)
    .UseWindowsService() // 标记为Windows服务
    .ConfigureServices((context, services) =>
    {
        // 注册RabbitMQ
        services.AddRabbitMq(context.Configuration);
        // 注册事件处理器和业务服务
        services.AddScoped<MapUpdatedEventHandler>();
        services.AddScoped<DownloadService>();
        // 注册Windows服务
        services.AddHostedService<WindowsBackgroundService>();
        // 注册事件生产者(如果当前服务也需要发布事件的话)
        // services.AddScoped<MapEventPublisher>();
    })
    .Build();

await host.RunAsync();

关键注意事项

  1. 持久化配置:Exchange、Queue、Message都设置为持久化(代码中已配置),确保RabbitMQ重启后不丢失数据
  2. 重连逻辑:Windows服务长期运行,必须处理RabbitMQ连接断开的情况,代码中用循环+延迟重试实现
  3. 消息确认:使用手动确认机制,只有当下载逻辑成功完成后才删除消息,避免消息丢失
  4. 错误处理:处理消息时的异常要单独捕获,避免单个消息的错误导致整个消费者崩溃
  5. 序列化:如果事件需要携带业务数据,用JSON/Protobuf序列化,不要直接传字符串

内容来源于stack exchange

火山引擎 最新活动