You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

基于MQTTnet的双客户端MQTT消息收发故障排查求助

MQTTnet客户端消息收发故障排查与修复

问题背景

为理解MQTT工作机制,基于MQTTnet编写简易程序,用MQTTX、MQTT Explorer测试。创建两个代码完全相同的项目,期望实现client1与client2之间的消息收发,两个客户端均订阅同一主题,Mosquitto服务已正常运行,但消息发送后无任何效果。

现有代码问题分析

  • 异步连接处理不当:直接丢弃Connect()的返回Task,轮询IsConnected的逻辑无法准确等待连接完成,可能出现连接逻辑未执行完就进入循环的情况。
  • Will Topic格式错误:遗嘱主题包含空格,MQTT主题不允许存在空格,会导致遗嘱消息失效甚至影响客户端连接。
  • 消息发送硬编码错误send方法中把要发送的内容固定为字符串"message",传入的实际消息参数被覆盖。
  • 客户端ID重复:两个客户端使用相同的ClientId,MQTT Broker会强制踢掉先连接的客户端,导致仅一个客户端在线,无法实现双向通信。
  • 订阅时机错误:ManagedMqttClient在调用StartAsync前执行订阅,订阅可能无法生效,需由ManagedClient在连接成功后自动处理订阅。

修复后的代码示例

修正后的Connect方法

public async Task Connect()
{
    try
    {
        mqttClient1 = new MqttFactory().CreateManagedMqttClient();
        // 移除Will Topic中的空格
        string deathTopic = @"HXgN1.0/group_id/NDEATH/machine_id";
        // 生成唯一客户端ID,避免重复冲突
        string clientId = $"IoApp_{Guid.NewGuid().ToString("N")}";

        var builder = new MqttClientOptionsBuilder()
                        .WithClientId(clientId)
                        .WithTcpServer("127.0.0.1", 1883)
                        .WithProtocolVersion(MqttProtocolVersion.V311)
                        .WithWillTopic(deathTopic)
                        .WithTimeout(TimeSpan.FromSeconds(60))
                        .WithWillPayload("{\"timestamp\":\"" + DateTime.Now.ToShortTimeString() + "\"}")
                        .WithWillRetain(true);

        var options = new ManagedMqttClientOptionsBuilder()
                        .WithAutoReconnectDelay(TimeSpan.FromSeconds(60))
                        .WithClientOptions(builder.Build())
                        // 配置自动订阅,让ManagedClient连接成功后自动处理
                        .WithSubscriptions(new MqttTopicFilterBuilder()
                            .WithTopic("HXgN1.0/group_id/NCMD/machine_id")
                            .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
                            .Build())
                        .Build();

        // 绑定事件处理器
        mqttClient1.ConnectedAsync += MqttClient_ConnectedAsync;
        mqttClient1.ConnectingFailedAsync += MqttClient_ConnectingFailedAsync;
        mqttClient1.DisconnectedAsync += MqttClient_DisconnectedAsync;
        mqttClient1.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;

        // 启动Managed客户端并连接
        await mqttClient1.StartAsync(options);
    }
    catch (Exception ex)
    {
        // 添加异常日志便于排查
        Console.WriteLine($"连接失败:{ex.Message}");
        throw;
    }
}

修正后的消息发送方法

public async Task SendAsync(string topic, string message, bool retain)
{
    if (mqttClient1 != null && mqttClient1.IsConnected)
    {
        await mqttClient1.EnqueueAsync(topic, message, MqttQualityOfServiceLevel.AtLeastOnce, retain);
    }
}

修正后的连接调用逻辑

await Connect();
// 无需轮询,ConnectedAsync事件会通知连接成功状态

消息接收处理器示例

private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
    string topic = arg.ApplicationMessage.Topic;
    string payload = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
    Console.WriteLine($"收到消息:主题={topic},内容={payload}");
    return Task.CompletedTask;
}

额外测试建议

  • 在MQTTX/MQTT Explorer中订阅#主题,验证客户端的连接状态、遗嘱消息及Birth Certificate是否正常发送。
  • 发送消息前先判断客户端IsConnected状态,确保消息发送时机正确。
  • 启用MQTTnet日志,查看客户端与Broker的交互细节,排查隐藏问题。

内容的提问来源于stack exchange,提问作者Patrick

火山引擎 最新活动