RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(Advanced Message Queuing Protocol)协议实现。它为应用程序提供了可靠的消息传递机制,是构建分布式系统和微服务架构的重要组件。

RabbitMQ 核心概念

基础组件

  • Producer(生产者) - 发送消息的应用程序
  • Consumer(消费者) - 接收消息的应用程序
  • Queue(队列) - 存储消息的缓冲区
  • Exchange(交换机) - 接收生产者消息并路由到队列
  • Routing Key(路由键) - 交换机路由消息的规则
  • Binding(绑定) - 交换机和队列之间的链接关系

交换机类型

交换机类型 路由规则 使用场景
Direct 精确匹配路由键 点对点消息传递
Fanout 广播到所有绑定队列 广播通知、日志收集
Topic 模式匹配路由键 复杂路由规则
Headers 基于消息头属性 复杂过滤条件

消息确认机制

1
2
3
4
消息确认类型:
- Auto Ack: 消息发送后立即确认
- Manual Ack: 消费者手动确认处理完成
- Nack: 消费者拒绝消息,可选择重新排队

环境准备与配置

Docker 快速部署 RabbitMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# docker-compose.yml
version: "3.8"
services:
rabbitmq:
image: rabbitmq:3.12-management
container_name: rabbitmq
hostname: rabbitmq
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin123
RABBITMQ_DEFAULT_VHOST: /
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- app_network

volumes:
rabbitmq_data:

networks:
app_network:
driver: bridge
1
2
3
4
5
# 启动RabbitMQ服务
docker-compose up -d

# 访问管理界面: http://localhost:15672
# 用户名/密码: admin/admin123

NuGet 包安装

1
2
3
4
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />

项目架构设计

项目结构

1
2
3
4
5
6
7
RabbitMqTest/
├── Models/
│ └── Order.cs # 订单数据模型
├── OrderPublisher.cs # 订单发布者
├── InventoryConsumer.cs # 库存消费者
├── NotificationConsumer.cs # 通知消费者
└── RabbitMqConfig.cs # RabbitMQ配置

配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// RabbitMqConfig.cs
/// <summary>
/// RabbitMQ 配置常量
/// </summary>
public static class RabbitMqConfig
{
// 基础连接配置
public const string HostName = "localhost";
public const int Port = 5672;
public const string UserName = "admin";
public const string Password = "admin123";
public const string VirtualHost = "/";

// 业务交换机和队列配置
public const string ExchangeName = "order.exchange";
public const string OrderRoutingKey = "order.created";
public const string QueueInventory = "order.inventory.queue";
public const string QueueNotification = "order.notification.queue";

// 死信队列配置
public const string DlxExchangeName = "dlx.exchange";
public const string DlxQueueName = "dlx.order.queue";
public const string DlxRoutingKey = "dlx.routing.key";

// 消息TTL配置(毫秒)
public const int MessageTtl = 30000; // 30秒
public const int QueueTtl = 60000; // 60秒
}

核心功能实现

订单发布者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// OrderPublisher.cs
public class OrderPublisher : IDisposable
{
private readonly IConnection _connection;
private readonly IChannel _channel;
private bool _disposed = false;

public OrderPublisher()
{
// 创建连接工厂
var factory = new ConnectionFactory
{
HostName = RabbitMqConfig.HostName,
Port = RabbitMqConfig.Port,
UserName = RabbitMqConfig.UserName,
Password = RabbitMqConfig.Password,
VirtualHost = RabbitMqConfig.VirtualHost
};

// 建立连接和通道
_connection = factory.CreateConnectionAsync().Result;
_channel = _connection.CreateChannelAsync().Result;

// 初始化基础设施
InitializeInfrastructure().Wait();
}

/// <summary>
/// 初始化RabbitMQ基础设施
/// </summary>
private async Task InitializeInfrastructure()
{
// 声明主交换机(持久化)
await _channel.ExchangeDeclareAsync(
exchange: RabbitMqConfig.ExchangeName,
type: ExchangeType.Direct,
durable: true, // 持久化
autoDelete: false, // 不自动删除
arguments: null
);

// 声明死信交换机
await _channel.ExchangeDeclareAsync(
exchange: RabbitMqConfig.DlxExchangeName,
type: ExchangeType.Direct,
durable: true,
autoDelete: false,
arguments: null
);

// 配置队列参数(支持死信队列)
var queueArguments = new Dictionary<string, object>
{
{ "x-dead-letter-exchange", RabbitMqConfig.DlxExchangeName },
{ "x-dead-letter-routing-key", RabbitMqConfig.DlxRoutingKey },
{ "x-message-ttl", RabbitMqConfig.MessageTtl }, // 消息TTL
{ "x-max-length", 1000 }, // 队列最大长度
{ "x-overflow", "reject-publish" } // 队列满时拒绝发布
};

// 声明业务队列
await _channel.QueueDeclareAsync(
queue: RabbitMqConfig.QueueInventory,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArguments
);

await _channel.QueueDeclareAsync(
queue: RabbitMqConfig.QueueNotification,
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArguments
);

// 声明死信队列
await _channel.QueueDeclareAsync(
queue: RabbitMqConfig.DlxQueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null
);

// 绑定队列到交换机
await _channel.QueueBindAsync(
queue: RabbitMqConfig.QueueInventory,
exchange: RabbitMqConfig.ExchangeName,
routingKey: RabbitMqConfig.OrderRoutingKey
);

await _channel.QueueBindAsync(
queue: RabbitMqConfig.QueueNotification,
exchange: RabbitMqConfig.ExchangeName,
routingKey: RabbitMqConfig.OrderRoutingKey
);

// 绑定死信队列
await _channel.QueueBindAsync(
queue: RabbitMqConfig.DlxQueueName,
exchange: RabbitMqConfig.DlxExchangeName,
routingKey: RabbitMqConfig.DlxRoutingKey
);
}

/// <summary>
/// 发布订单消息
/// </summary>
public async Task<bool> PublishOrder(Order order)
{
try
{
// 序列化订单数据
var messageBody = JsonConvert.SerializeObject(order);
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);

// 设置消息属性
var properties = new BasicProperties
{
Persistent = true, // 消息持久化
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent,
Headers = new Dictionary<string, object>
{
{ "OrderType", "Standard" },
{ "Priority", order.Quantity > 10 ? "High" : "Normal" },
{ "PublishTime", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") }
}
};

// 发布消息到交换机
await _channel.BasicPublishAsync(
exchange: RabbitMqConfig.ExchangeName,
routingKey: RabbitMqConfig.OrderRoutingKey,
mandatory: true, // 如果没有匹配的队列,返回消息
basicProperties: properties,
body: bodyBytes
);

Console.WriteLine($"订单消息已发布: {order.OrderId} - 产品: {order.ProductId} - 数量: {order.Quantity}");
return true;
}
catch (Exception ex)
{
Console.WriteLine($"发布订单消息失败: {ex.Message}");
return false;
}
}

/// <summary>
/// 批量发布订单
/// </summary>
public async Task<int> PublishOrdersBatch(IEnumerable<Order> orders)
{
int successCount = 0;
var batch = _channel.CreateBasicPublishBatch();

try
{
foreach (var order in orders)
{
var messageBody = JsonConvert.SerializeObject(order);
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);

var properties = new BasicProperties
{
Persistent = true,
MessageId = Guid.NewGuid().ToString(),
Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds()),
ContentType = "application/json"
};

batch.Add(
exchange: RabbitMqConfig.ExchangeName,
routingKey: RabbitMqConfig.OrderRoutingKey,
mandatory: true,
properties: properties,
body: bodyBytes
);
}

// 批量发送
await batch.PublishAsync();
successCount = orders.Count();

Console.WriteLine($"批量发布 {successCount} 条订单消息");
}
catch (Exception ex)
{
Console.WriteLine($"批量发布失败: {ex.Message}");
}

return successCount;
}

public async Task Close()
{
if (_channel?.IsOpen == true)
{
await _channel.CloseAsync();
}

if (_connection?.IsOpen == true)
{
await _connection.CloseAsync();
}
}

public void Dispose()
{
if (!_disposed)
{
Close().Wait();
_channel?.Dispose();
_connection?.Dispose();
_disposed = true;
}
}
}

库存消费者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// InventoryConsumer.cs
public class InventoryConsumer : IDisposable
{
private readonly IConnection _connection;
private readonly IChannel _channel;
private readonly SemaphoreSlim _semaphore;
private bool _disposed = false;

public InventoryConsumer(int maxConcurrency = 5)
{
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);

var factory = new ConnectionFactory
{
HostName = RabbitMqConfig.HostName,
Port = RabbitMqConfig.Port,
UserName = RabbitMqConfig.UserName,
Password = RabbitMqConfig.Password,
VirtualHost = RabbitMqConfig.VirtualHost
};

_connection = factory.CreateConnectionAsync().Result;
_channel = _connection.CreateChannelAsync().Result;

// 设置QoS - 限制未确认消息数量
_channel.BasicQosAsync(
prefetchSize: 0, // 不限制消息大小
prefetchCount: 10, // 每次预取10条消息
global: false // 只应用于当前消费者
).Wait();
}

/// <summary>
/// 启动库存消费者
/// </summary>
public async Task Start()
{
Console.WriteLine("库存消费者启动中...");

var consumer = new AsyncEventingBasicConsumer(_channel);

// 注册消息处理事件
consumer.ReceivedAsync += OnMessageReceived;

// 开始消费消息
await _channel.BasicConsumeAsync(
queue: RabbitMqConfig.QueueInventory,
autoAck: false, // 手动确认
consumer: consumer
);

Console.WriteLine("库存消费者已启动,等待处理订单...");
Console.WriteLine("按任意键停止消费者...");
Console.ReadKey();

await Stop();
}

/// <summary>
/// 处理接收到的消息
/// </summary>
private async Task OnMessageReceived(object sender, BasicDeliverEventArgs eventArgs)
{
await _semaphore.WaitAsync(); // 控制并发

try
{
var messageBody = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
var order = JsonConvert.DeserializeObject<Order>(messageBody);

Console.WriteLine($"[库存] 接收到订单: {order.OrderId} - 产品: {order.ProductId} - 数量: {order.Quantity}");

// 模拟库存处理业务逻辑
var success = await ProcessInventory(order);

if (success)
{
// 处理成功,确认消息
await _channel.BasicAckAsync(
deliveryTag: eventArgs.DeliveryTag,
multiple: false
);

Console.WriteLine($"[库存] 订单 {order.OrderId} 库存处理完成");
}
else
{
// 处理失败,拒绝消息并重新排队
await _channel.BasicNackAsync(
deliveryTag: eventArgs.DeliveryTag,
multiple: false,
requeue: true // 重新排队
);

Console.WriteLine($"[库存] 订单 {order.OrderId} 库存处理失败,消息已重新排队");
}
}
catch (Exception ex)
{
Console.WriteLine($"[库存] 处理消息时发生异常: {ex.Message}");

// 发生异常时拒绝消息,不重新排队(发送到死信队列)
await _channel.BasicNackAsync(
deliveryTag: eventArgs.DeliveryTag,
multiple: false,
requeue: false
);
}
finally
{
_semaphore.Release();
}
}

/// <summary>
/// 模拟库存处理业务逻辑
/// </summary>
private async Task<bool> ProcessInventory(Order order)
{
try
{
// 模拟处理时间
await Task.Delay(Random.Shared.Next(500, 2000));

// 模拟库存检查
if (order.Quantity > 100)
{
Console.WriteLine($" [库存] 订单 {order.OrderId} 数量过大,库存不足");
return false;
}

// 模拟库存扣减
Console.WriteLine($" [库存] 为订单 {order.OrderId} 扣减库存 {order.Quantity} 件");

// 模拟数据库操作
await UpdateInventoryInDatabase(order);

return true;
}
catch (Exception ex)
{
Console.WriteLine($"[库存] 处理订单 {order.OrderId} 时发生错误: {ex.Message}");
return false;
}
}

/// <summary>
/// 模拟数据库库存更新
/// </summary>
private async Task UpdateInventoryInDatabase(Order order)
{
// 模拟数据库操作延迟
await Task.Delay(100);

// 这里应该是实际的数据库操作
Console.WriteLine($"[库存] 数据库已更新,产品 {order.ProductId} 库存减少 {order.Quantity}");
}

public async Task Stop()
{
Console.WriteLine(" 正在停止库存消费者...");

if (_channel?.IsOpen == true)
{
await _channel.CloseAsync();
}

if (_connection?.IsOpen == true)
{
await _connection.CloseAsync();
}

Console.WriteLine(" 库存消费者已停止");
}

public void Dispose()
{
if (!_disposed)
{
Stop().Wait();
_semaphore?.Dispose();
_channel?.Dispose();
_connection?.Dispose();
_disposed = true;
}
}
}

通知消费者实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
// NotificationConsumer.cs
public class NotificationConsumer : IDisposable
{
private readonly IConnection _connection;
private readonly IChannel _channel;
private readonly CancellationTokenSource _cancellationTokenSource;
private bool _disposed = false;

public NotificationConsumer()
{
_cancellationTokenSource = new CancellationTokenSource();

var factory = new ConnectionFactory
{
HostName = RabbitMqConfig.HostName,
Port = RabbitMqConfig.Port,
UserName = RabbitMqConfig.UserName,
Password = RabbitMqConfig.Password,
VirtualHost = RabbitMqConfig.VirtualHost
};

_connection = factory.CreateConnectionAsync().Result;
_channel = _connection.CreateChannelAsync().Result;

// 设置QoS
_channel.BasicQosAsync(0, 5, false).Wait();
}

/// <summary>
/// 启动通知消费者
/// </summary>
public async Task Start()
{
Console.WriteLine(" 通知消费者启动中...");

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += OnMessageReceived;

await _channel.BasicConsumeAsync(
queue: RabbitMqConfig.QueueNotification,
autoAck: false,
consumer: consumer
);

Console.WriteLine(" 通知消费者已启动,等待发送通知...");
Console.WriteLine("按任意键停止消费者...");
Console.ReadKey();

await Stop();
}

/// <summary>
/// 处理通知消息
/// </summary>
private async Task OnMessageReceived(object sender, BasicDeliverEventArgs eventArgs)
{
try
{
var messageBody = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
var order = JsonConvert.DeserializeObject<Order>(messageBody);

Console.WriteLine($"[通知] 接收到订单通知: {order.OrderId}");

// 处理通知逻辑
var success = await ProcessNotification(order);

if (success)
{
await _channel.BasicAckAsync(eventArgs.DeliveryTag, false);
Console.WriteLine($"[通知] 订单 {order.OrderId} 通知发送成功");
}
else
{
// 通知发送失败,重试3次后放入死信队列
var retryCount = GetRetryCount(eventArgs.BasicProperties);

if (retryCount < 3)
{
// 增加重试计数并重新排队
await _channel.BasicNackAsync(eventArgs.DeliveryTag, false, true);
Console.WriteLine($"[通知] 订单 {order.OrderId} 通知发送失败,重试第 {retryCount + 1} 次");
}
else
{
// 超过重试次数,发送到死信队列
await _channel.BasicNackAsync(eventArgs.DeliveryTag, false, false);
Console.WriteLine($"[通知] 订单 {order.OrderId} 通知发送失败,已发送到死信队列");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"[通知] 处理消息异常: {ex.Message}");
await _channel.BasicNackAsync(eventArgs.DeliveryTag, false, false);
}
}

/// <summary>
/// 模拟通知处理业务逻辑
/// </summary>
private async Task<bool> ProcessNotification(Order order)
{
try
{
// 模拟通知处理时间
await Task.Delay(Random.Shared.Next(200, 1000));

// 模拟不同类型的通知
var notifications = new[]
{
$"邮件通知: 邮件通知: 您的订单 {order.OrderId} 已确认",
$" 短信通知: 订单 {order.OrderId} 正在处理中",
$" 推送通知: 订单 {order.OrderId} 状态更新"
};

foreach (var notification in notifications)
{
Console.WriteLine($"[通知] {notification}");
await Task.Delay(100); // 模拟发送延迟
}

// 模拟发送失败概率
if (Random.Shared.NextDouble() < 0.1) // 10% 失败率
{
Console.WriteLine($" [通知] 订单 {order.OrderId} 部分通知发送失败");
return false;
}

return true;
}
catch (Exception ex)
{
Console.WriteLine($"[通知] 发送通知异常: {ex.Message}");
return false;
}
}

/// <summary>
/// 获取消息重试次数
/// </summary>
private int GetRetryCount(IBasicProperties properties)
{
if (properties.Headers != null &&
properties.Headers.TryGetValue("x-retry-count", out var retryCountObj))
{
return Convert.ToInt32(retryCountObj);
}
return 0;
}

public async Task Stop()
{
Console.WriteLine(" 正在停止通知消费者...");

_cancellationTokenSource.Cancel();

if (_channel?.IsOpen == true)
{
await _channel.CloseAsync();
}

if (_connection?.IsOpen == true)
{
await _connection.CloseAsync();
}

Console.WriteLine(" 通知消费者已停止");
}

public void Dispose()
{
if (!_disposed)
{
Stop().Wait();
_cancellationTokenSource?.Dispose();
_channel?.Dispose();
_connection?.Dispose();
_disposed = true;
}
}
}

订单数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Models/Order.cs
public class Order
{
/// <summary>
/// 订单ID
/// </summary>
public string OrderId { get; set; }

/// <summary>
/// 产品ID
/// </summary>
public string ProductId { get; set; }

/// <summary>
/// 购买数量
/// </summary>
public int Quantity { get; set; }

/// <summary>
/// 订单金额
/// </summary>
public decimal Amount { get; set; }

/// <summary>
/// 客户名称
/// </summary>
public string CustomerName { get; set; }

/// <summary>
/// 创建时间
/// </summary>
public DateTime CreateTime { get; set; }

/// <summary>
/// 订单状态
/// </summary>
public string Status { get; set; } = "Created";

/// <summary>
/// 备注信息
/// </summary>
public string Notes { get; set; }

public override string ToString()
{
return $"Order[{OrderId}] - Product: {ProductId}, Qty: {Quantity}, Amount: {Amount:C}, Customer: {CustomerName}";
}
}

实际应用示例

主程序集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
public static async Task RabbitMQTestFunc()
{
// 初始化多进程实例(可选)
InitializeProcessInstances();

Console.WriteLine(" RabbitMQ 消息队列演示程序");
Console.WriteLine("=".PadRight(50, '='));

while (true)
{
Console.WriteLine("\n请选择操作模式:");
Console.WriteLine("1. - 发布订单消息");
Console.WriteLine("2. - 启动库存消费者");
Console.WriteLine("3. - 启动通知消费者");
Console.WriteLine("4. - 批量发布测试");
Console.WriteLine("5. - 查看队列状态");
Console.WriteLine("9. - 退出程序");
Console.WriteLine("-".PadRight(30, '-'));
Console.Write("请选择: ");

var choice = Console.ReadLine();

switch (choice)
{
case "1":
await PublishOrderMessages();
break;
case "2":
await StartInventoryConsumer();
break;
case "3":
await StartNotificationConsumer();
break;
case "4":
await BatchPublishTest();
break;
case "5":
await ShowQueueStatus();
break;
case "9":
Console.WriteLine(" 程序退出");
return;
default:
Console.WriteLine(" 无效选择,请重新输入!");
break;
}
}
}

/// <summary>
/// 发布订单消息
/// </summary>
private static async Task PublishOrderMessages()
{
Console.WriteLine("\n开始发布订单消息 开始发布订单消息...");

using var publisher = new OrderPublisher();

// 生成测试订单
var orders = GenerateTestOrders(5);

foreach (var order in orders)
{
var success = await publisher.PublishOrder(order);
if (success)
{
Console.WriteLine($" 订单 {order.OrderId} 发布成功");
}

await Task.Delay(500); // 间隔发送
}

Console.WriteLine(" 订单消息发布完成!");
}

/// <summary>
/// 启动库存消费者
/// </summary>
private static async Task StartInventoryConsumer()
{
Console.WriteLine("\n启动库存消费者 启动库存消费者...");
using var consumer = new InventoryConsumer(maxConcurrency: 5);
await consumer.Start();
}

/// <summary>
/// 启动通知消费者
/// </summary>
private static async Task StartNotificationConsumer()
{
Console.WriteLine("\n启动通知消费者 启动通知消费者...");
using var consumer = new NotificationConsumer();
await consumer.Start();
}

/// <summary>
/// 批量发布测试
/// </summary>
private static async Task BatchPublishTest()
{
Console.WriteLine("\n批量发布测试 批量发布测试...");

using var publisher = new OrderPublisher();
var orders = GenerateTestOrders(50); // 生成50个测试订单

var stopwatch = Stopwatch.StartNew();
var successCount = await publisher.PublishOrdersBatch(orders);
stopwatch.Stop();

Console.WriteLine($" 批量发布完成: {successCount}/{orders.Count} 条消息");
Console.WriteLine($"耗时: 耗时: {stopwatch.ElapsedMilliseconds} ms");
Console.WriteLine($" 平均速度: {successCount * 1000.0 / stopwatch.ElapsedMilliseconds:F2} 消息/秒");
}

/// <summary>
/// 显示队列状态(需要RabbitMQ Management API)
/// </summary>
private static async Task ShowQueueStatus()
{
Console.WriteLine("\n 队列状态信息:");
Console.WriteLine($"管理界面: http://localhost:15672");
Console.WriteLine($"用户名/密码: admin/admin123");
Console.WriteLine("请通过管理界面查看详细的队列状态信息");
}

/// <summary>
/// 生成测试订单数据
/// </summary>
private static List<Order> GenerateTestOrders(int count)
{
var orders = new List<Order>();
var random = new Random();

var products = new[] { "iPhone15", "MacBook", "iPad", "AirPods", "iWatch" };
var customers = new[] { "张三", "李四", "王五", "赵六", "钱七" };

for (int i = 1; i <= count; i++)
{
orders.Add(new Order
{
OrderId = $"ORD{DateTime.Now:yyyyMMdd}{random.Next(1000, 9999)}",
ProductId = products[random.Next(products.Length)],
Quantity = random.Next(1, 21),
Amount = random.Next(100, 5000),
CustomerName = customers[random.Next(customers.Length)],
CreateTime = DateTime.Now,
Status = "Created",
Notes = $"测试订单 #{i}"
});
}

return orders;
}

高级特性与优化技巧

连接池管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class RabbitMqConnectionPool : IDisposable
{
private readonly ConcurrentQueue<IConnection> _connections = new();
private readonly SemaphoreSlim _semaphore;
private readonly ConnectionFactory _factory;
private readonly int _maxConnections;
private int _currentConnections = 0;
private bool _disposed = false;

public RabbitMqConnectionPool(int maxConnections = 10)
{
_maxConnections = maxConnections;
_semaphore = new SemaphoreSlim(maxConnections, maxConnections);

_factory = new ConnectionFactory
{
HostName = RabbitMqConfig.HostName,
Port = RabbitMqConfig.Port,
UserName = RabbitMqConfig.UserName,
Password = RabbitMqConfig.Password,
VirtualHost = RabbitMqConfig.VirtualHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
}

public async Task<IConnection> GetConnectionAsync()
{
await _semaphore.WaitAsync();

if (_connections.TryDequeue(out var connection) && connection.IsOpen)
{
return connection;
}

// 创建新连接
connection = await _factory.CreateConnectionAsync($"Connection-{Interlocked.Increment(ref _currentConnections)}");
return connection;
}

public void ReturnConnection(IConnection connection)
{
if (connection.IsOpen && !_disposed)
{
_connections.Enqueue(connection);
}
else
{
connection?.Dispose();
}

_semaphore.Release();
}

public void Dispose()
{
if (!_disposed)
{
_disposed = true;

while (_connections.TryDequeue(out var connection))
{
connection.Dispose();
}

_semaphore.Dispose();
}
}
}

消息监控与指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MessageMetrics
{
private long _publishedCount = 0;
private long _consumedCount = 0;
private long _failedCount = 0;
private readonly ConcurrentDictionary<string, long> _queueCounts = new();

public void IncrementPublished() => Interlocked.Increment(ref _publishedCount);
public void IncrementConsumed() => Interlocked.Increment(ref _consumedCount);
public void IncrementFailed() => Interlocked.Increment(ref _failedCount);

public void IncrementQueueCount(string queueName)
{
_queueCounts.AddOrUpdate(queueName, 1, (key, oldValue) => oldValue + 1);
}

public void PrintMetrics()
{
Console.WriteLine("\n 消息处理统计:");
Console.WriteLine($"发布消息数: {_publishedCount}");
Console.WriteLine($"消费消息数: {_consumedCount}");
Console.WriteLine($"失败消息数: {_failedCount}");
Console.WriteLine($"成功率: {(_consumedCount * 100.0 / Math.Max(_publishedCount, 1)):F2}%");

Console.WriteLine("\n 队列统计:");
foreach (var kvp in _queueCounts)
{
Console.WriteLine($"{kvp.Key}: {kvp.Value} 条消息");
}
}
}

死信队列处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class DeadLetterConsumer : IDisposable
{
private readonly IConnection _connection;
private readonly IChannel _channel;

public DeadLetterConsumer()
{
var factory = new ConnectionFactory
{
HostName = RabbitMqConfig.HostName,
// ... 其他配置
};

_connection = factory.CreateConnectionAsync().Result;
_channel = _connection.CreateChannelAsync().Result;
}

public async Task Start()
{
Console.WriteLine(" 死信队列消费者启动...");

var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += OnDeadLetterReceived;

await _channel.BasicConsumeAsync(
queue: RabbitMqConfig.DlxQueueName,
autoAck: false,
consumer: consumer
);

Console.WriteLine(" 死信队列消费者已启动");
Console.ReadKey();
}

private async Task OnDeadLetterReceived(object sender, BasicDeliverEventArgs eventArgs)
{
try
{
var messageBody = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
var order = JsonConvert.DeserializeObject<Order>(messageBody);

Console.WriteLine($" 死信消息: {order.OrderId}");

// 记录死信消息
await LogDeadLetter(order, eventArgs);

// 可选:实现重新处理逻辑
var shouldRetry = await ShouldRetryMessage(order);
if (shouldRetry)
{
await RequeueMessage(order);
}

await _channel.BasicAckAsync(eventArgs.DeliveryTag, false);
}
catch (Exception ex)
{
Console.WriteLine($" 处理死信消息异常: {ex.Message}");
await _channel.BasicNackAsync(eventArgs.DeliveryTag, false, false);
}
}

private async Task LogDeadLetter(Order order, BasicDeliverEventArgs eventArgs)
{
// 记录死信消息到数据库或日志文件
var logEntry = new
{
OrderId = order.OrderId,
OriginalQueue = eventArgs.RoutingKey,
FailureTime = DateTime.Now,
Headers = eventArgs.BasicProperties.Headers,
Reason = "Message processing failed"
};

Console.WriteLine($" 死信日志: {JsonConvert.SerializeObject(logEntry, Formatting.Indented)}");
await Task.CompletedTask;
}

private async Task<bool> ShouldRetryMessage(Order order)
{
// 实现重试逻辑判断
// 例如:检查订单状态、时间间隔等
await Task.CompletedTask;
return false; // 示例:不重试
}

private async Task RequeueMessage(Order order)
{
// 重新发布消息到原始队列
using var publisher = new OrderPublisher();
await publisher.PublishOrder(order);
Console.WriteLine($" 死信消息已重新排队: {order.OrderId}");
}

public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}

性能优化与监控

性能调优建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 1. 连接复用
public class OptimizedConsumer
{
private readonly IConnection _connection;
private readonly List<IChannel> _channels;

public OptimizedConsumer(int channelCount = Environment.ProcessorCount)
{
_connection = factory.CreateConnectionAsync().Result;
_channels = new List<IChannel>();

// 创建多个通道以提高并发处理能力
for (int i = 0; i < channelCount; i++)
{
var channel = _connection.CreateChannelAsync().Result;
channel.BasicQosAsync(0, 10, false).Wait(); // 每个通道预取10条消息
_channels.Add(channel);
}
}
}

// 2. 批量确认
public async Task BatchAcknowledge(List<BasicDeliverEventArgs> messages)
{
if (messages.Count == 0) return;

// 批量确认最后一条消息,会自动确认之前的所有消息
var lastMessage = messages.OrderBy(m => m.DeliveryTag).Last();
await _channel.BasicAckAsync(lastMessage.DeliveryTag, multiple: true);
}

// 3. 消息预取优化
private void OptimizeQoS()
{
// 根据消息处理速度调整预取数量
var processingTimeMs = GetAverageProcessingTime();
var optimalPrefetch = (int)(1000 / processingTimeMs) * 2; // 2倍安全系数

_channel.BasicQosAsync(0, (ushort)Math.Min(optimalPrefetch, 100), false);
}

监控集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RabbitMqHealthCheck : IHealthCheck
{
private readonly IConnection _connection;

public RabbitMqHealthCheck(IConnection connection)
{
_connection = connection;
}

public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
if (_connection.IsOpen)
{
return Task.FromResult(HealthCheckResult.Healthy("RabbitMQ connection is healthy"));
}
else
{
return Task.FromResult(HealthCheckResult.Unhealthy("RabbitMQ connection is closed"));
}
}
catch (Exception ex)
{
return Task.FromResult(HealthCheckResult.Unhealthy($"RabbitMQ health check failed: {ex.Message}"));
}
}
}

小结

RabbitMQ 作为企业级消息队列解决方案,为 C#应用程序提供了完善的异步通信能力:

核心优势

  • 高性能 - 支持百万级消息吞吐量
  • 高可靠 - 消息持久化、确认机制、集群部署
  • 易扩展 - 灵活的路由规则和插件系统
  • 可监控 - 丰富的管理界面和监控指标
  • 跨平台 - 支持多种编程语言和操作系统

适用场景

场景 特点 RabbitMQ 优势
微服务通信 服务解耦、异步处理 可靠消息传递、路由灵活
流量削峰 高并发请求处理 消息缓冲、流量控制
数据同步 系统间数据一致性 顺序保证、事务支持
任务调度 后台任务处理 延迟队列、死信处理

使用 RabbitMQ 可以帮助我们构建高可用、高性能的分布式系统。在实际使用中需要根据业务需求选择合适的交换机类型和消息确认策略,并做好监控和错误处理。


生产环境建议:

  • 集群部署: 使用 RabbitMQ 集群确保高可用性
  • 监控告警: 集成 Prometheus、Grafana 等监控工具
  • 安全配置: 启用 SSL/TLS 加密和访问控制
  • 容量规划: 根据业务量合理配置队列和连接数

相关资源: