MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,特别适用于物联网设备和低带宽、高延迟或不可靠的网络环境。在 C#开发中,MQTT 可以帮助我们实现高效的消息发布订阅模式。

MQTT 简介

核心特性

  • 轻量级协议 - 最小化传输开销,适合资源受限的设备
  • 发布/订阅模式 - 解耦消息生产者和消费者
  • QoS 支持 - 提供三种服务质量等级
  • 持久会话 - 支持客户端断线重连后恢复消息
  • Last Will 遗嘱 - 客户端异常断开时自动发送指定消息

QoS 等级说明

QoS 等级 描述 使用场景
QoS 0 至多一次传递 传感器数据,允许丢失
QoS 1 至少一次传递 重要通知,可容忍重复
QoS 2 仅一次传递 金融交易,必须精确一次

C# MQTT 实现方案

环境准备

首先安装必要的 NuGet 包:

1
2
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="4.3.6.1152" />

项目结构设计

1
2
3
4
5
6
7
8
Services/
├── Interface/
│ ├── IMqttService.cs # MQTT服务接口
│ └── IMqttMessageHandler.cs # 消息处理器接口
├── Impl/
│ └── MqttService.cs # MQTT服务实现
└── Models/
└── MqttConfig.cs # MQTT配置类

核心代码实现

🎪 配置类定义

1
2
3
4
5
6
7
8
9
10
11
// Models/MqttConfig.cs
public class MqttConfig
{
public string Broker { get; set; } = "broker.hivemq.com"; // 免费测试服务器
public int Port { get; set; } = 1883;
public string Username { get; set; } = "";
public string Password { get; set; } = "";
public string ClientId { get; set; } = $"mqtt_client_{Guid.NewGuid()}";
public bool CleanSession { get; set; } = true;
public TimeSpan KeepAlive { get; set; } = TimeSpan.FromSeconds(60);
}

🔌 服务接口设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Interface/IMqttService.cs
public interface IMqttService
{
/// <summary>
/// 注册消息处理器
/// </summary>
Task RegisterHandlerAsync(string topicFilter, Func<string, string, Task> handler);

/// <summary>
/// 发布消息
/// </summary>
Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce);

/// <summary>
/// 订阅主题
/// </summary>
Task SubscribeAsync(string topicFilter);

/// <summary>
/// 取消订阅
/// </summary>
Task UnsubscribeAsync(string topicFilter);

/// <summary>
/// 检查连接状态
/// </summary>
bool IsConnected { get; }
}

// Interface/IMqttMessageHandler.cs
public interface IMqttMessageHandler
{
Task HandleMessageAsync(string topic, string payload);
}

MQTT 服务核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Services/Impl/MqttService.cs
public class MqttService : IMqttService, IDisposable
{
private readonly Dictionary<string, Func<string, string, Task>> _handlerDelegates = new();
private readonly IManagedMqttClient _mqttClient;
private readonly MqttConfig _config;

public bool IsConnected => _mqttClient?.IsConnected ?? false;

public MqttService()
{
_mqttClient = new MqttFactory().CreateManagedMqttClient();
_config = new MqttConfig();

var options = new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithClientOptions(
new MqttClientOptionsBuilder()
.WithTcpServer(_config.Broker, _config.Port)
.WithCredentials(_config.Username, _config.Password)
.WithClientId(_config.ClientId)
.WithCleanSession(_config.CleanSession)
.WithKeepAlivePeriod(_config.KeepAlive)
.Build()
)
.Build();

// 注册事件处理器
_mqttClient.ConnectedAsync += OnConnected;
_mqttClient.DisconnectedAsync += OnDisconnected;
_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;

// 启动连接
Task.Run(async () =>
{
await _mqttClient.StartAsync(options);
});
}

#region 事件处理器

private async Task OnConnected(MqttClientConnectedEventArgs e)
{
Console.WriteLine($"[MQTT] 已连接到服务器: {_config.Broker}:{_config.Port}");
Console.WriteLine($"[MQTT] 客户端ID: {_config.ClientId}");

// 重新订阅所有主题
foreach (var topicFilter in _handlerDelegates.Keys)
{
await SubscribeAsync(topicFilter);
}
}

private async Task OnDisconnected(MqttClientDisconnectedEventArgs e)
{
Console.WriteLine($"[MQTT] 连接断开: {e.Reason}");
if (e.Exception != null)
{
Console.WriteLine($"[MQTT] 异常: {e.Exception.Message}");
}
await Task.CompletedTask;
}

private async Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment);

Console.WriteLine($"[MQTT] 收到消息 - 主题: {topic}, 内容: {payload}");

// 查找匹配的处理器
foreach (var kvp in _handlerDelegates)
{
if (IsTopicMatch(kvp.Key, topic))
{
try
{
await kvp.Value(topic, payload);
}
catch (Exception ex)
{
Console.WriteLine($"[MQTT] 处理消息时发生错误: {ex.Message}");
}
}
}
}

#endregion

#region 公共方法

public async Task RegisterHandlerAsync(string topicFilter, Func<string, string, Task> handler)
{
_handlerDelegates[topicFilter] = handler;

if (IsConnected)
{
await SubscribeAsync(topicFilter);
}

Console.WriteLine($"[MQTT] 已注册处理器: {topicFilter}");
}

public async Task PublishAsync(string topic, string payload, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtMostOnce)
{
if (!IsConnected)
{
Console.WriteLine("[MQTT] 客户端未连接,无法发布消息");
return;
}

var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qos)
.WithRetainFlag(false)
.Build();

await _mqttClient.EnqueueAsync(message);
Console.WriteLine($"[MQTT] 消息已发布 - 主题: {topic}, 内容: {payload}");
}

public async Task SubscribeAsync(string topicFilter)
{
if (!IsConnected)
{
Console.WriteLine("[MQTT] 客户端未连接,无法订阅主题");
return;
}

await _mqttClient.SubscribeAsync(topicFilter);
Console.WriteLine($"[MQTT] 已订阅主题: {topicFilter}");
}

public async Task UnsubscribeAsync(string topicFilter)
{
if (!IsConnected)
{
Console.WriteLine("[MQTT] 客户端未连接,无法取消订阅");
return;
}

await _mqttClient.UnsubscribeAsync(topicFilter);
_handlerDelegates.Remove(topicFilter);
Console.WriteLine($"[MQTT] 已取消订阅: {topicFilter}");
}

#endregion

#region 辅助方法

/// <summary>
/// 检查主题是否匹配过滤器(支持通配符)
/// </summary>
private bool IsTopicMatch(string filter, string topic)
{
// 将 MQTT 通配符转换为正则表达式
var pattern = filter
.Replace("+", @"[^/]+") // + 匹配单级
.Replace("#", @".*"); // # 匹配多级

return Regex.IsMatch(topic, $"^{pattern}$");
}

#endregion

public void Dispose()
{
_mqttClient?.StopAsync().Wait();
_mqttClient?.Dispose();
}
}

实际应用示例

基础使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static async Task MqttTest()
{
var services = new ServiceCollection();
services.AddSingleton<IMqttService, MqttService>();

var serviceProvider = services.BuildServiceProvider();
var mqttService = serviceProvider.GetRequiredService<IMqttService>();

// 注册消息处理器 - 使用通配符订阅传感器数据
await mqttService.RegisterHandlerAsync(
"sensors/#",
async (topic, payload) =>
{
Console.WriteLine($"收到来自【{topic}】的消息: {payload}");

// 这里可以添加具体的业务处理逻辑
// 比如数据解析、存储、转发等
if (topic.Contains("temperature"))
{
Console.WriteLine("处理温度数据...");
}
else if (topic.Contains("humidity"))
{
Console.WriteLine("处理湿度数据...");
}

await Task.CompletedTask;
}
);

// 发布不同类型的传感器数据
await mqttService.PublishAsync("sensors/device1/temperature", "25.6°C");
await mqttService.PublishAsync("sensors/device2/humidity", "60%");
await mqttService.PublishAsync("sensors/device3/pressure", "1013.25hPa");

// 等待消息处理
await Task.Delay(2000);
}

物联网设备监控场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class IoTDeviceMonitor
{
private readonly IMqttService _mqttService;

public IoTDeviceMonitor(IMqttService mqttService)
{
_mqttService = mqttService;
}

public async Task StartMonitoringAsync()
{
// 监控设备状态
await _mqttService.RegisterHandlerAsync(
"devices/+/status",
async (topic, payload) =>
{
var deviceId = ExtractDeviceId(topic);
var status = JsonSerializer.Deserialize<DeviceStatus>(payload);

Console.WriteLine($"设备 {deviceId} 状态更新: {status.Status}");

if (status.Status == "offline")
{
await HandleDeviceOffline(deviceId);
}
}
);

// 监控告警信息
await _mqttService.RegisterHandlerAsync(
"alerts/+/+",
async (topic, payload) =>
{
var parts = topic.Split('/');
var deviceId = parts[1];
var alertType = parts[2];

Console.WriteLine($"告警: 设备 {deviceId} 发生 {alertType} 告警");
await ProcessAlert(deviceId, alertType, payload);
}
);
}

private string ExtractDeviceId(string topic)
{
return topic.Split('/')[1];
}

private async Task HandleDeviceOffline(string deviceId)
{
Console.WriteLine($"处理设备 {deviceId} 离线事件");
// 发送通知、记录日志等
await Task.CompletedTask;
}

private async Task ProcessAlert(string deviceId, string alertType, string payload)
{
Console.WriteLine($"处理设备 {deviceId}{alertType} 告警: {payload}");
// 告警处理逻辑
await Task.CompletedTask;
}
}

public class DeviceStatus
{
public string Status { get; set; }
public DateTime Timestamp { get; set; }
public double CpuUsage { get; set; }
public double MemoryUsage { get; set; }
}

实用技巧与优化

连接管理技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class MqttConnectionManager
{
private readonly IManagedMqttClient _client;
private readonly SemaphoreSlim _connectionSemaphore = new(1, 1);

public async Task<bool> EnsureConnectedAsync()
{
await _connectionSemaphore.WaitAsync();
try
{
if (!_client.IsConnected)
{
Console.WriteLine("检测到连接断开,正在重新连接...");
await _client.StartAsync(GetMqttOptions());

// 等待连接建立
var timeout = TimeSpan.FromSeconds(10);
var cancellationToken = new CancellationTokenSource(timeout).Token;

while (!_client.IsConnected && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(100, cancellationToken);
}

return _client.IsConnected;
}
return true;
}
finally
{
_connectionSemaphore.Release();
}
}

private ManagedMqttClientOptions GetMqttOptions()
{
return new ManagedMqttClientOptionsBuilder()
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.WithMaxPendingMessages(100) // 限制待发送消息数量
.WithClientOptions(
new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com", 1883)
.WithClientId($"client_{Environment.MachineName}_{Guid.NewGuid():N}")
.WithCleanSession(false) // 持久会话
.WithKeepAlivePeriod(TimeSpan.FromSeconds(30))
.WithWillMessage(new MqttApplicationMessageBuilder()
.WithTopic("devices/status/last-will")
.WithPayload($"Client {Environment.MachineName} disconnected unexpectedly")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.WithRetainFlag(true)
.Build())
.Build()
)
.Build();
}
}

性能优化建议

  1. 消息批量处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private readonly Channel<MqttMessage> _messageChannel = Channel.CreateUnbounded<MqttMessage>();

private async Task ProcessMessagesInBatches()
{
var batch = new List<MqttMessage>();
await foreach (var message in _messageChannel.Reader.ReadAllAsync())
{
batch.Add(message);

if (batch.Count >= 100) // 批量大小
{
await ProcessBatch(batch);
batch.Clear();
}
}
}
  1. 主题订阅优化
1
2
3
4
5
6
// 避免过多的通配符订阅,影响性能
// 推荐:明确的主题路径
await mqttService.SubscribeAsync("sensors/temperature/device001");

// 避免:过于宽泛的通配符
// await mqttService.SubscribeAsync("#"); // 接收所有消息

错误处理与重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class RobustMqttService : IMqttService
{
private readonly ILogger<RobustMqttService> _logger;
private readonly MqttService _innerService;

public async Task PublishWithRetryAsync(string topic, string payload, int maxRetries = 3)
{
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
await _innerService.PublishAsync(topic, payload);
return; // 成功发布,退出重试循环
}
catch (Exception ex)
{
_logger.LogWarning($"发布消息失败 (尝试 {attempt}/{maxRetries}): {ex.Message}");

if (attempt == maxRetries)
{
_logger.LogError($"消息发布最终失败: {ex}");
throw;
}

// 指数退避
var delay = TimeSpan.FromMilliseconds(Math.Pow(2, attempt) * 1000);
await Task.Delay(delay);
}
}
}
}

常见问题与解决方案

常见问题排查

问题 可能原因 解决方案
连接失败 网络问题、认证失败 检查网络连接和认证信息
消息丢失 QoS 设置不当 使用合适的 QoS 等级
重复消息 QoS 1/2 的重传机制 在业务层实现幂等性
内存泄漏 未释放客户端资源 正确实现 IDisposable

调试技巧

1
2
3
4
5
6
7
8
9
10
// 启用详细日志
var factory = new MqttFactory();
factory.UseClientAdapterLogger = true;

// 消息追踪
private void TraceMessage(string direction, string topic, string payload)
{
var timestamp = DateTime.Now.ToString("HH:mm:ss.fff");
Console.WriteLine($"[{timestamp}] {direction} | {topic} | {payload}");
}

小结

MQTT 在 C#中的应用非常广泛,特别适合:

  • 物联网设备通信 - 传感器数据收集和设备控制
  • 实时消息推送 - 移动应用和 Web 应用的实时更新
  • 工业自动化 - 设备监控和数据采集
  • 车联网应用 - 车辆状态监控和远程控制

使用 MQTT 可以帮助我们构建高效、可靠的分布式消息系统。在实际开发中要根据具体的业务场景选择合适的 QoS 等级,并做好错误处理和连接管理。


使用建议:

  • 开发阶段: 使用免费的公共 MQTT 服务器测试
  • 生产环境: 部署自己的 MQTT 服务器(如 Mosquitto、EMQ X)
  • 安全考虑: 启用 TLS 加密和身份验证
  • 监控运维: 实施完善的日志记录和性能监控

相关资源: