MQTTnet客户端连接EMQX成功但无法订阅主题接收消息
.NET 6服务使用MQTTnet连接EMQX无法接收消息问题
基于.NET 6构建的服务,使用MQTTnet连接EMQX Broker。客户端已成功连接EMQX,但通过MQTTX(MQTT客户端工具)发送测试消息时,服务始终无法接收,尽管已调用SubscribeAsync方法,订阅似乎未生效。
代码结构
Program.cs(服务配置)
var builder = Host.CreateApplicationBuilder(args); // Configure services builder.Services.AddSingleton<MQTTClient>(); builder.Services.AddHostedService<ServiceWorker>(); // Other configuration... var host = builder.Build(); await host.RunAsync();
MQTTClient.cs
public class MQTTClient { private readonly ILogger<MQTTClient> _logger; private readonly IConfiguration _configuration; private readonly IManagedMqttClient _mqttClient; public MQTTClient(ILogger<MQTTClient> logger, IConfiguration configuration) { _logger = logger; _configuration = configuration; _mqttClient = new MqttFactory().CreateManagedMqttClient(); _mqttClient.ConnectedAsync += async (MqttClientConnectedEventArgs eventArgs) => { _logger.LogInformation("Connected to MQTT server"); }; _mqttClient.DisconnectedAsync += async (MqttClientDisconnectedEventArgs eventArgs) => { _logger.LogInformation("Disconnected from MQTT server"); }; _mqttClient.ApplicationMessageReceivedAsync += HandleMessageReceived; } public async Task StartAsync(CancellationToken cancellationToken) { string connectionstring = _configuration["MQTT:Connectionstring"]; string username = _configuration["MQTT:Username"]; string password = _configuration["MQTT:Password"]; string mqttClientId = $"FetchImportService-{Guid.NewGuid()}"; bool ws = connectionstring.StartsWith("ws"); var options = new MqttClientOptionsBuilder() .WithClientId(mqttClientId) .WithCredentials(username, password) .WithTls() .WithCleanSession(); if (ws) options.WithWebSocketServer(connectionstring); else options.WithTcpServer(connectionstring); var builtOptions = options.Build(); var mqttOptions = new ManagedMqttClientOptionsBuilder() .WithClientOptions(builtOptions) .WithAutoReconnectDelay(TimeSpan.FromSeconds(3)) .WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy.DropNewMessage) .Build(); await _mqttClient.StartAsync(mqttOptions); } public async Task SubscribeAsync() { await _mqttClient.SubscribeAsync(_configuration["MQTT:Addtopic"]); await _mqttClient.SubscribeAsync(_configuration["MQTT:Bulkaddtopic"]); } private async Task HandleMessageReceived(MqttApplicationMessageReceivedEventArgs eventArgs) { var topic = eventArgs.ApplicationMessage.Topic; _logger.LogInformation("Message received: {topic}", topic); try { if (topic.EndsWith(_configuration["MQTT:Addtopic"])) { _logger.LogInformation("Processing single client add request"); var data = JsonSerializer.Deserialize<RequestData>(eventArgs.ApplicationMessage.Payload); // Process message... } } catch (Exception ex) { _logger.LogError(ex, "Error processing message"); } } }
ServiceWorker.cs
public class ServiceWorker : BackgroundService { private readonly ILogger<ServiceWorker> _logger; private readonly MQTTClient _mqttClient; public ServiceWorker(ILogger<ServiceWorker> logger, MQTTClient mqttClient) { _logger = logger; _mqttClient = mqttClient; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("ServiceWorker running at: {time}", DateTimeOffset.Now); await _mqttClient.StartAsync(stoppingToken); await _mqttClient.SubscribeAsync(); while (!stoppingToken.IsCancellationRequested) { await Task.Delay(1000, stoppingToken); } await _mqttClient.StopAsync(stoppingToken); } }
日志信息
根据方法输出日志如下:
[15:18:18 INF] Connection with server established. [15:18:18 INF] Start receiving packets. [15:18:18 INF] TX (81 bytes) >>> Connect: [ClientId=FetchImportService-488b512d-0b99-4073-9d59-93675cb7b682] [Username=test] [Password=****] [KeepAlivePeriod=15] [CleanSession=True] [15:18:18 INF] RX (4 bytes) <<< ConnAck: [ReturnCode=ConnectionAccepted] [ReasonCode=Success] [IsSessionPresent=False] [15:18:18 INF] Authenticated MQTT connection with server established. [15:18:18 INF] Connected. [15:18:18 INF] Connected to MQTT server [15:18:18 INF] Start sending keep alive packets. [15:18:18 INF] Publishing subscriptions at reconnect [15:18:18 INF] Publishing 2 added and 0 removed subscriptions [15:18:18 INF] TX (74 bytes) >>> Subscribe: [PacketIdentifier=1] [TopicFilters=test/lookupservice/clients/add@AtLeastOnce,test/lookupservice/clients/bulkadd@AtLeastOnce] [15:18:18 INF] RX (6 bytes) <<< SubAck: [PacketIdentifier=1] [ReasonCode=UnspecifiedError,UnspecifiedError]
已确认事项
- 服务已成功连接EMQX(通过日志和EMQX控制台验证)
- 连接状态稳定
- 可通过MQTTX成功向目标主题发布测试消息
- MQTTX发送的消息在其历史记录中可见
- 服务的HandleMessageReceived方法从未被触发
测试环境配置
- 使用MQTTX向目标主题发布测试消息
- 配置中的主题与MQTTX发布的主题完全匹配
- 消息发布成功(MQTTX订阅同一主题可接收)
- 测试期间服务处于运行并连接状态
环境信息
- .NET 6.0
- MQTTnet Version="4.1.4.563"
- EMQX Broker
- MQTTX
问题问询
为何我的服务无法接收MQTTX发送的消息?客户端已成功连接,但MQTTX发送的消息从未触发HandleMessageReceived方法,是什么原因导致消息接收失败?
内容的提问来源于stack exchange,提问作者tisaconundrum




