MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,特别适用于物联网设备和低带宽、高延迟或不可靠的网络环境。在 C#开发中,MQTT 可以帮助我们实现高效的消息发布订阅模式。
MQTT 简介 核心特性
轻量级协议 - 最小化传输开销,适合资源受限的设备
发布/订阅模式 - 解耦消息生产者和消费者
QoS 支持 - 提供三种服务质量等级
持久会话 - 支持客户端断线重连后恢复消息
Last Will 遗嘱 - 客户端异常断开时自动发送指定消息
QoS 等级说明
QoS 等级
描述
使用场景
QoS 0
至多一次传递
传感器数据,允许丢失
QoS 1
至少一次传递
重要通知,可容忍重复
QoS 2
仅一次传递
金融交易,必须精确一次
C# MQTT 实现方案 环境准备 首先安装必要的 NuGet 包:
1 2 <PackageReference Include ="MQTTnet" Version ="4.3.6.1152" /> <PackageReference Include ="MQTTnet.Extensions.ManagedClient" Version ="4.3.6.1152" />
项目结构设计 1 2 3 4 5 6 7 8 Services/ ├── Interface/ │ ├── IMqttService.cs # MQTT服务接口 │ └── IMqttMessageHandler.cs # 消息处理器接口 ├── Impl/ │ └── MqttService.cs # MQTT服务实现 └── Models/ └── MqttConfig.cs # MQTT配置类
核心代码实现 🎪 配置类定义 1 2 3 4 5 6 7 8 9 10 11 public class MqttConfig { public string Broker { get ; set ; } = "broker.hivemq.com" ; public int Port { get ; set ; } = 1883 ; public string Username { get ; set ; } = "" ; public string Password { get ; set ; } = "" ; public string ClientId { get ; set ; } = $"mqtt_client_{Guid.NewGuid()} " ; public bool CleanSession { get ; set ; } = true ; public TimeSpan KeepAlive { get ; set ; } = TimeSpan.FromSeconds(60 ); }
🔌 服务接口设计 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public interface IMqttService { Task RegisterHandlerAsync (string topicFilter, Func<string , string , Task> handler ) ; Task PublishAsync (string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce ) ; Task SubscribeAsync (string topicFilter ) ; Task UnsubscribeAsync (string topicFilter ) ; bool IsConnected { get ; } } public interface IMqttMessageHandler { Task HandleMessageAsync (string topic, string payload ) ; }
MQTT 服务核心实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 public class MqttService : IMqttService , IDisposable { private readonly Dictionary<string , Func<string , string , Task>> _handlerDelegates = new (); private readonly IManagedMqttClient _mqttClient; private readonly MqttConfig _config; public bool IsConnected => _mqttClient?.IsConnected ?? false ; public MqttService () { _mqttClient = new MqttFactory().CreateManagedMqttClient(); _config = new MqttConfig(); var options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5 )) .WithClientOptions( new MqttClientOptionsBuilder() .WithTcpServer(_config.Broker, _config.Port) .WithCredentials(_config.Username, _config.Password) .WithClientId(_config.ClientId) .WithCleanSession(_config.CleanSession) .WithKeepAlivePeriod(_config.KeepAlive) .Build() ) .Build(); _mqttClient.ConnectedAsync += OnConnected; _mqttClient.DisconnectedAsync += OnDisconnected; _mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived; Task.Run(async () => { await _mqttClient.StartAsync(options); }); } #region 事件处理器 private async Task OnConnected (MqttClientConnectedEventArgs e ) { Console.WriteLine($"[MQTT] 已连接到服务器: {_config.Broker} :{_config.Port} " ); Console.WriteLine($"[MQTT] 客户端ID: {_config.ClientId} " ); foreach (var topicFilter in _handlerDelegates.Keys) { await SubscribeAsync(topicFilter); } } private async Task OnDisconnected (MqttClientDisconnectedEventArgs e ) { Console.WriteLine($"[MQTT] 连接断开: {e.Reason} " ); if (e.Exception != null ) { Console.WriteLine($"[MQTT] 异常: {e.Exception.Message} " ); } await Task.CompletedTask; } private async Task OnMessageReceived (MqttApplicationMessageReceivedEventArgs e ) { var topic = e.ApplicationMessage.Topic; var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment); Console.WriteLine($"[MQTT] 收到消息 - 主题: {topic} , 内容: {payload} " ); foreach (var kvp in _handlerDelegates) { if (IsTopicMatch(kvp.Key, topic)) { try { await kvp.Value(topic, payload); } catch (Exception ex) { Console.WriteLine($"[MQTT] 处理消息时发生错误: {ex.Message} " ); } } } } #endregion #region 公共方法 public async Task RegisterHandlerAsync (string topicFilter, Func<string , string , Task> handler ) { _handlerDelegates[topicFilter] = handler; if (IsConnected) { await SubscribeAsync(topicFilter); } Console.WriteLine($"[MQTT] 已注册处理器: {topicFilter} " ); } public async Task PublishAsync (string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce ) { if (!IsConnected) { Console.WriteLine("[MQTT] 客户端未连接,无法发布消息" ); return ; } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(payload) .WithQualityOfServiceLevel(qos) .WithRetainFlag(false ) .Build(); await _mqttClient.EnqueueAsync(message); Console.WriteLine($"[MQTT] 消息已发布 - 主题: {topic} , 内容: {payload} " ); } public async Task SubscribeAsync (string topicFilter ) { if (!IsConnected) { Console.WriteLine("[MQTT] 客户端未连接,无法订阅主题" ); return ; } await _mqttClient.SubscribeAsync(topicFilter); Console.WriteLine($"[MQTT] 已订阅主题: {topicFilter} " ); } public async Task UnsubscribeAsync (string topicFilter ) { if (!IsConnected) { Console.WriteLine("[MQTT] 客户端未连接,无法取消订阅" ); return ; } await _mqttClient.UnsubscribeAsync(topicFilter); _handlerDelegates.Remove(topicFilter); Console.WriteLine($"[MQTT] 已取消订阅: {topicFilter} " ); } #endregion #region 辅助方法 private bool IsTopicMatch (string filter, string topic ) { var pattern = filter .Replace("+" , @"[^/]+" ) .Replace("#" , @".*" ); return Regex.IsMatch(topic, $"^{pattern} $" ); } #endregion public void Dispose () { _mqttClient?.StopAsync().Wait(); _mqttClient?.Dispose(); } }
实际应用示例 基础使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public static async Task MqttTest (){ var services = new ServiceCollection(); services.AddSingleton<IMqttService, MqttService>(); var serviceProvider = services.BuildServiceProvider(); var mqttService = serviceProvider.GetRequiredService<IMqttService>(); await mqttService.RegisterHandlerAsync( "sensors/#" , async (topic, payload) => { Console.WriteLine($"收到来自【{topic} 】的消息: {payload} " ); if (topic.Contains("temperature" )) { Console.WriteLine("处理温度数据..." ); } else if (topic.Contains("humidity" )) { Console.WriteLine("处理湿度数据..." ); } await Task.CompletedTask; } ); await mqttService.PublishAsync("sensors/device1/temperature" , "25.6°C" ); await mqttService.PublishAsync("sensors/device2/humidity" , "60%" ); await mqttService.PublishAsync("sensors/device3/pressure" , "1013.25hPa" ); await Task.Delay(2000 ); }
物联网设备监控场景 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class IoTDeviceMonitor { private readonly IMqttService _mqttService; public IoTDeviceMonitor (IMqttService mqttService ) { _mqttService = mqttService; } public async Task StartMonitoringAsync () { await _mqttService.RegisterHandlerAsync( "devices/+/status" , async (topic, payload) => { var deviceId = ExtractDeviceId(topic); var status = JsonSerializer.Deserialize<DeviceStatus>(payload); Console.WriteLine($"设备 {deviceId} 状态更新: {status.Status} " ); if (status.Status == "offline" ) { await HandleDeviceOffline(deviceId); } } ); await _mqttService.RegisterHandlerAsync( "alerts/+/+" , async (topic, payload) => { var parts = topic.Split('/' ); var deviceId = parts[1 ]; var alertType = parts[2 ]; Console.WriteLine($"告警: 设备 {deviceId} 发生 {alertType} 告警" ); await ProcessAlert(deviceId, alertType, payload); } ); } private string ExtractDeviceId (string topic ) { return topic.Split('/' )[1 ]; } private async Task HandleDeviceOffline (string deviceId ) { Console.WriteLine($"处理设备 {deviceId} 离线事件" ); await Task.CompletedTask; } private async Task ProcessAlert (string deviceId, string alertType, string payload ) { Console.WriteLine($"处理设备 {deviceId} 的 {alertType} 告警: {payload} " ); await Task.CompletedTask; } } public class DeviceStatus { public string Status { get ; set ; } public DateTime Timestamp { get ; set ; } public double CpuUsage { get ; set ; } public double MemoryUsage { get ; set ; } }
实用技巧与优化 连接管理技巧 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class MqttConnectionManager { private readonly IManagedMqttClient _client; private readonly SemaphoreSlim _connectionSemaphore = new (1 , 1 ); public async Task<bool > EnsureConnectedAsync () { await _connectionSemaphore.WaitAsync(); try { if (!_client.IsConnected) { Console.WriteLine("检测到连接断开,正在重新连接..." ); await _client.StartAsync(GetMqttOptions()); var timeout = TimeSpan.FromSeconds(10 ); var cancellationToken = new CancellationTokenSource(timeout).Token; while (!_client.IsConnected && !cancellationToken.IsCancellationRequested) { await Task.Delay(100 , cancellationToken); } return _client.IsConnected; } return true ; } finally { _connectionSemaphore.Release(); } } private ManagedMqttClientOptions GetMqttOptions () { return new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5 )) .WithMaxPendingMessages(100 ) .WithClientOptions( new MqttClientOptionsBuilder() .WithTcpServer("broker.hivemq.com" , 1883 ) .WithClientId($"client_{Environment.MachineName} _{Guid.NewGuid():N} " ) .WithCleanSession(false ) .WithKeepAlivePeriod(TimeSpan.FromSeconds(30 )) .WithWillMessage(new MqttApplicationMessageBuilder() .WithTopic("devices/status/last-will" ) .WithPayload($"Client {Environment.MachineName} disconnected unexpectedly" ) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .WithRetainFlag(true ) .Build()) .Build() ) .Build(); } }
性能优化建议
消息批量处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private readonly Channel<MqttMessage> _messageChannel = Channel.CreateUnbounded<MqttMessage>();private async Task ProcessMessagesInBatches (){ var batch = new List<MqttMessage>(); await foreach (var message in _messageChannel.Reader.ReadAllAsync()) { batch.Add(message); if (batch.Count >= 100 ) { await ProcessBatch(batch); batch.Clear(); } } }
主题订阅优化
1 2 3 4 5 6 await mqttService.SubscribeAsync("sensors/temperature/device001" );
错误处理与重试机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class RobustMqttService : IMqttService { private readonly ILogger<RobustMqttService> _logger; private readonly MqttService _innerService; public async Task PublishWithRetryAsync (string topic, string payload, int maxRetries = 3 ) { for (int attempt = 1 ; attempt <= maxRetries; attempt++) { try { await _innerService.PublishAsync(topic, payload); return ; } catch (Exception ex) { _logger.LogWarning($"发布消息失败 (尝试 {attempt} /{maxRetries} ): {ex.Message} " ); if (attempt == maxRetries) { _logger.LogError($"消息发布最终失败: {ex} " ); throw ; } var delay = TimeSpan.FromMilliseconds(Math.Pow(2 , attempt) * 1000 ); await Task.Delay(delay); } } } }
常见问题与解决方案 常见问题排查
问题
可能原因
解决方案
连接失败
网络问题、认证失败
检查网络连接和认证信息
消息丢失
QoS 设置不当
使用合适的 QoS 等级
重复消息
QoS 1/2 的重传机制
在业务层实现幂等性
内存泄漏
未释放客户端资源
正确实现 IDisposable
调试技巧 1 2 3 4 5 6 7 8 9 10 var factory = new MqttFactory();factory.UseClientAdapterLogger = true ; private void TraceMessage (string direction, string topic, string payload ){ var timestamp = DateTime.Now.ToString("HH:mm:ss.fff" ); Console.WriteLine($"[{timestamp} ] {direction} | {topic} | {payload} " ); }
小结 MQTT 在 C#中的应用非常广泛,特别适合:
物联网设备通信 - 传感器数据收集和设备控制
实时消息推送 - 移动应用和 Web 应用的实时更新
工业自动化 - 设备监控和数据采集
车联网应用 - 车辆状态监控和远程控制
使用 MQTT 可以帮助我们构建高效、可靠的分布式消息系统。在实际开发中要根据具体的业务场景选择合适的 QoS 等级,并做好错误处理和连接管理。
使用建议 :
开发阶段 : 使用免费的公共 MQTT 服务器测试
生产环境 : 部署自己的 MQTT 服务器(如 Mosquitto、EMQ X)
安全考虑 : 启用 TLS 加密和身份验证
监控运维 : 实施完善的日志记录和性能监控
相关资源 :