RabbitMQ 是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。
1.基本概念
生产者(Producer)
生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够连接到 RabbitMQ 服务器,并且能够发送消息到 RabbitMQ 服务器。
消费者(Consumer)
消费者是一个接收消息的程序。接收消息的程序可以是任何语言编写的,只要它能够连接到 RabbitMQ 服务器,并且能够从 RabbitMQ 服务器接收消息。
队列(Queue)
队列是 RabbitMQ 的内部对象,用于存储消息。多个生产者可以向一个队列发送消息,多个消费者可以尝试从一个队列接收消息。队列支持多种消息分发策略。
交换机(Exchange)
交换机是消息的分发中心。它接收来自生产者的消息,然后将这些消息分发给队列。交换机有多种类型,包括直连交换机、主题交换机、扇形交换机、头交换机。
绑定(Binding)
绑定是交换机和队列之间的关联关系。绑定可以使用路由键进行绑定,也可以使用通配符进行绑定。
路由键(Routing Key)
路由键是生产者发送消息时附带的一个属性。路由键的作用是决定消息被分发到哪个队列。
通配符(Wildcard)
通配符是一种模式匹配的方式。RabbitMQ 支持两种通配符:`*`和`#`。
绑定键(Binding Key)
绑定键是交换机和队列之间的关联关系。绑定键可以使用路由键进行绑定,也可以使用通配符进行绑定。
持久化(Durable)
持久化是指 RabbitMQ 服务器重启后,消息是否还存在。持久化可以应用到交换机、队列、绑定、消息等。
确认机制(Acknowledge)
确认机制是指消费者接收到消息后,向 RabbitMQ 服务器发送一个确认消息。RabbitMQ 服务器收到确认消息后,会删除这条消息。
自动确认
消费者接收到消息后,RabbitMQ 服务器会自动删除这条消息。
手动确认
消费者接收到消息后,需要向 RabbitMQ 服务器发送一个确认消息。RabbitMQ 服务器收到确认消息后,会删除这条消息。
拒绝机制(Reject)
拒绝机制是指消费者接收到消息后,向 RabbitMQ 服务器发送一个拒绝消息。RabbitMQ 服务器收到拒绝消息后,会将这条消息重新发送给其他消费者。
死信队列(Dead Letter Queue)
死信队列是指消息被拒绝、过期或者达到最大重试次数后,会被发送到死信队列。
消息过期(Message TTL)
消息过期是指消息在指定时间内没有被消费者消费,会被删除。
消息优先级(Message Priority)
消息优先级是指消息在队列中的优先级。消息优先级高的消息会被优先消费。
消息分发
消息分发是指消息在队列中的分发策略。消息分发策略包括轮询分发、公平分发、负载均衡分发。
2.环境搭建
Docker 安装 RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=always --hostname my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -e TZ=Asia/Shanghai rabbitmq:management
-d:后台运行
–restart:重启策略
–name:容器名称
-p:端口映射
–hostname:主机名
-e:环境变量
RABBITMQ_DEFAULT_USER:默认用户名
RABBITMQ_DEFAULT_PASS:默认密码
TZ:时区
rabbitmq:management:镜像名称
Docker Compose 安装 RabbitMQ
version: "3.1"
services:
rabbitmq:
restart: always
image: rabbitmq:management
container_name: rabbitmq
hostname: my-rabbit
ports:
- 5672:5672
- 15672:15672 # RabbitMQ 管理界面端口
environment:
TZ: Asia/Shanghai
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: admin
restart:重启策略
image:镜像名称
container_name:容器名称
hostname:主机名
ports:端口映射
environment:环境变量
TZ:时区
RABBITMQ_DEFAULT_USER:默认用户名
RABBITMQ_DEFAULT_PASS:默认密码
rabbitmq:management:镜像名称
3.使用
客户端 SDK 代码在 GitHub:https://github.com/Tangtang1997/IKunLibrary
新建 TestRequest 类,实现 IRabbitMqRequest 接口,定义消息体
public class TestRequest : IRabbitMqRequest
{
///
/// 重试次数
///
public int RetryCount { get; set; }
#region 自定义字段
///
/// id
///
public string Id { get; set; } = default!;
///
/// 名称
///
public string Name { get; set; } = default!;
///
/// 年龄
///
public int Age { get; set; }
#endregion
}
新建 TestRequestHandler 类,实现 IRabbitMqRequestHandler 接口,处理消息
public class TestRequestHanlder : IRequestProcessorHandler
{
private readonly ILogger _logger;
public TestRequestHanlder(ILogger logger)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task StopAsync(int milliseconds, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public async Task HandleAsync(TestRequest request, CancellationToken cancellationToken = default)
{
_logger.LogInformation($"开始处理消息: {request.Id}");
//模拟处理消息耗时操作
await Task.Delay(1000, cancellationToken);
_logger.LogInformation($"消息处理完成: {request.Id}");
}
}
使用 IHostedService 来托管服务
public class SampleHostedService : IHostedService
{
private readonly IConsumerProcessorManager _consumerProcessorManager;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly ILogger _logger;
public SampleHostedService(
IConsumerProcessorManager consumerProcessorManager,
IHostApplicationLifetime applicationLifetime,
ILogger logger)
{
_consumerProcessorManager = consumerProcessorManager;
_applicationLifetime = applicationLifetime;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
_applicationLifetime.ApplicationStarted.Register(() =>
{
_logger.LogInformation("SampleHostedService is starting.");
_consumerProcessorManager.StartAsync(cancellationToken);
});
_applicationLifetime.ApplicationStopping.Register(() =>
{
_logger.LogInformation("SampleHostedService is stopping.");
_consumerProcessorManager.StopAsync(3000, cancellationToken);
});
await Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
}
}
注册并启用服务
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddHostedService();
var configuration = services.BuildServiceProvider().GetRequiredService();
var hostName = configuration["RabbitMq:Host"] ?? throw new Exception("HostName is not configured");
var port = int.Parse(configuration["RabbitMq:Port"] ?? throw new Exception("Port is not configured"));
var userName = configuration["RabbitMq:Username"] ?? throw new Exception("Username is not configured");
var password = configuration["RabbitMq:Password"] ?? throw new Exception("Password is not configured");
var queueName = configuration["RabbitMq:QueueName"] ?? throw new Exception("QueueName is not configured");
services.AddRabbitMq(options =>
{
options.UseSsl = false;
options.HostName = hostName;
options.Port = port;
options.UserName = userName;
options.Password = password;
options.Durable = true;
options.NetworkRecoveryInterval = 10000;
options.ExchangeType = ExchangeType.Direct;
options.QueueName = queueName;
options.Exchange = $"{queueName}_SERVICE_EXCHANGE";
options.RoutingKey = $"{queueName}_ROUTING_KEY";
options.DeadLetterExchange = $"{queueName}_SERVICE_EXCHANGE_DEAD";
options.DeadLetterQueueName = $"{queueName}_DEAD";
options.DeadLetterRoutingKey = $"{queueName}_ROUTING_KEY";
});
})
.Build();
await host.RunAsync();