You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

MQTTnet客户端连接EMQX成功但无法订阅主题接收消息

.NET 6服务使用MQTTnet连接EMQX无法接收消息问题

基于.NET 6构建的服务,使用MQTTnet连接EMQX Broker。客户端已成功连接EMQX,但通过MQTTX(MQTT客户端工具)发送测试消息时,服务始终无法接收,尽管已调用SubscribeAsync方法,订阅似乎未生效。

代码结构

Program.cs(服务配置)

var builder = Host.CreateApplicationBuilder(args);

// Configure services
builder.Services.AddSingleton<MQTTClient>();
builder.Services.AddHostedService<ServiceWorker>();

// Other configuration...

var host = builder.Build();
await host.RunAsync();

MQTTClient.cs

public class MQTTClient
{
    private readonly ILogger<MQTTClient> _logger;
    private readonly IConfiguration _configuration;
    private readonly IManagedMqttClient _mqttClient;

    public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration)
    {
        _logger = logger;
        _configuration = configuration;

        _mqttClient = new MqttFactory().CreateManagedMqttClient();
        _mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) => 
        { 
            _logger.LogInformation("Connected to MQTT server"); 
        };
        _mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) => 
        { 
            _logger.LogInformation("Disconnected from MQTT server"); 
        };
        _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        string connectionstring = _configuration["MQTT:Connectionstring"];
        string username = _configuration["MQTT:Username"];
        string password = _configuration["MQTT:Password"];
        string mqttClientId = $"FetchImportService-{Guid.NewGuid()}";

        bool ws = connectionstring.StartsWith("ws");

        var options = new MqttClientOptionsBuilder()
            .WithClientId(mqttClientId)
            .WithCredentials(username, password)
            .WithTls()
            .WithCleanSession();

        if (ws)
            options.WithWebSocketServer(connectionstring);
        else
            options.WithTcpServer(connectionstring);

        var builtOptions = options.Build();
        var mqttOptions = new ManagedMqttClientOptionsBuilder()
            .WithClientOptions(builtOptions)
            .WithAutoReconnectDelay(TimeSpan.FromSeconds(3))
            .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage)
            .Build();

        await _mqttClient.StartAsync(mqttOptions);
    }

    public async Task SubscribeAsync()
    {
        await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]);
        await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]);
    }

    private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs)
    {
        var topic = eventArgs.ApplicationMessage.Topic;
        _logger.LogInformation("Message received: {topic}", topic);

        try
        {
            if (topic.EndsWith(_configuration["MQTT:Addtopic"]))
            {
                _logger.LogInformation("Processing single client add request");
                var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload);
                // Process message...
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing message");
        }
    }
}

ServiceWorker.cs

public class ServiceWorker : BackgroundService
{
    private readonly ILogger<ServiceWorker> _logger;
    private readonly MQTTClient _mqttClient;

    public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient)
    {
        _logger = logger;
        _mqttClient = mqttClient;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now);

        await _mqttClient.StartAsync(stoppingToken);
        await _mqttClient.SubscribeAsync();
                
        while (!stoppingToken.IsCancellationRequested)
        {
            await Task.Delay(1000, stoppingToken);
        }

        await _mqttClient.StopAsync(stoppingToken);
    }
}

日志信息

根据方法输出日志如下:

[15:18:18 INF] Connection with server established.
[15:18:18 INF] Start receiving packets.
[15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True]
[15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False]
[15:18:18 INF] Authenticated MQTT connection with server established.
[15:18:18 INF] Connected.
[15:18:18 INF] Connected to MQTT server
[15:18:18 INF] Start sending keep alive packets.
[15:18:18 INF] Publishing subscriptions at reconnect
[15:18:18 INF] Publishing 2 added and 0 removed subscriptions
[15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce]
[15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]

已确认事项

  • 服务已成功连接EMQX(通过日志和EMQX控制台验证)
  • 连接状态稳定
  • 可通过MQTTX成功向目标主题发布测试消息
  • MQTTX发送的消息在其历史记录中可见
  • 服务的HandleMessageReceived方法从未被触发

测试环境配置

  • 使用MQTTX向目标主题发布测试消息
  • 配置中的主题与MQTTX发布的主题完全匹配
  • 消息发布成功(MQTTX订阅同一主题可接收)
  • 测试期间服务处于运行并连接状态

环境信息

  • .NET 6.0
  • MQTTnet Version="4.1.4.563"
  • EMQX Broker
  • MQTTX

问题问询

为何我的服务无法接收MQTTX发送的消息?客户端已成功连接,但MQTTX发送的消息从未触发HandleMessageReceived方法,是什么原因导致消息接收失败?


内容的提问来源于stack exchange,提问作者tisaconundrum

火山引擎 最新活动