RabbitMQ

介绍

引入消息队列的本质是解决分布式系统 / 高并发场景下的各类核心痛点,实现系统的解耦、削峰、异步化,最终提升系统的稳定性、可扩展性和性能

系统解耦:降低模块间的依赖,提升系统灵活性。这是消息队列最基础也最核心的意义之一。

  • 传统架构:模块 A 直接调用模块 B 的接口,若 B 修改接口、暂停服务、宕机,都会直接影响 A 的正常运行,模块间耦合度高,牵一发而动全身。
  • 引入消息队列后:模块 A 只需将需要传递的数据(消息)发送到消息队列,无需关心谁来接收、何时接收;模块 B 从消息队列中获取消息进行处理,也无需关心消息来自哪里。

削峰填谷:抵御流量突发冲击,保护核心系统

在秒杀、促销、国庆购票等场景下,会出现短时间内的流量洪峰,若直接将流量打入核心业务系统(如订单系统、支付系统),很容易导致系统过载、数据库宕机,引发服务雪崩。

  • 引入消息队列后:
    1. 突发的高流量请求会先被缓冲到消息队列中,形成 “峰值” 缓冲,避免直接冲击核心业务系统;
    2. 核心系统按照自身的处理能力,从消息队列中匀速拉取消息进行处理,将 “尖峰流量” 削平,转为 “平稳流量”,即 “填谷”;
    3. 若消息队列堆积超过阈值,还可触发限流、降级等策略,进一步保护核心系统。

异步通信:提升请求响应速度,提高系统吞吐量

传统同步调用中,请求方需要等待被调用方处理完成后才能返回结果,若被调用方处理耗时较长(如发送短信、邮件、生成报表),会导致请求响应缓慢,占用大量资源。

  • 引入消息队列后:
    1. 生产方发送消息到队列后,无需等待消费方处理,可立即返回结果给用户,实现 “异步化”;
    2. 耗时的业务逻辑由消费方在后台异步处理,不占用前端请求的响应时间;
    3. 系统可同时处理更多的请求,提升整体吞吐量。

MQ 技术选型

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的 Broker。

维度 RabbitMQ ActiveMQ RocketMQ Kafka
公司 / 社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP, XMPP, SMTP, STOMP OpenWire,STOMP, REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

综合能力 RabbitMQ 和 RocketMQ 强,除了大厂并发量非常大的情况下用 Kafka 或者 RocketMQ,绝大公司用 RabbitMQ 就够了。而且RabbitMQ 的协议支持更符合微服务的概念(各个业务独立开发互不影响,RocketMQ只能通过 Java 调用)

安装

基本上基于 Docker 安装

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=yourusername \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management

--network hmall中的hmall 是一个 Docker 自定义网络,它的核心作用是实现 Docker 容器之间的 “网络互通”。

Docker 默认有bridge(桥接)、host(主机)等网络模式,但默认bridge网络下,容器之间只能通过 IP 地址通信(不够灵活)。而自定义网络(比如这里的hmall)是用户手动创建的 Docker 网络,具有以下特点:

  1. 同一自定义网络内的容器,可以直接通过容器名称互相访问(无需记 IP);
  2. 实现容器间的网络隔离(不同自定义网络的容器默认无法通信);
  3. 更适合多服务的微服务架构(比如项目中的 “订单服务”“用户服务”“RabbitMQ” 都加入hmall网络,就能直接通过容器名调用)。

macOS 还可以用 homebrew 安装

1
brew install rabbitmq

架构

RabbitMQ 的整体架构及核心概念:

  • publisher:消息发送者(负责生产并发送消息)
  • consumer:消息的消费者(负责从队列中获取并处理消息)
  • queue:队列(用于存储消息的容器,消息最终会被投递到队列中)
  • exchange:交换机(负责接收 publisher 发送的消息,并根据路由规则将消息路由到对应的 queue)
  • virtual-host:虚拟主机,起到数据隔离的作用

架构流程:

  1. 消息发送者(Publisher)将消息发送给交换机(exchange);
  2. 交换机根据预设的路由规则,将消息分发到对应的队列(queue);
  3. 消息消费者(consumer)从队列中获取消息并进行处理。

image-20260104112516458

消息协议

AMQP

  • 全称:Advanced Message Queuing Protocol(高级消息队列协议)
  • 定义:是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP

  • 定义:是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。
  • 组成:包含两部分 ——
    1. spring-amqp:基础抽象层;
    2. spring-rabbit:底层的默认实现(对应 RabbitMQ)。

基本使用

SpringAMQP 提供了RabbitTemplate工具类,方便我们发送消息。发送消息代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}

SpringAMQP 提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给当前方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
public class SpringRabbitListener {

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
log.info("spring 消费者接收到消息:【" + msg + "】");
if (true) {
throw new MessageConversionException("故意的");
}
log.info("消息处理完成");
}
}

消费者消息推送限制

默认情况下,RabbitMQ 会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch值为 1,确保同一时刻最多投递给消费者 1 条消息:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

prefetch(全称 basic.qos 中的 prefetch_count)是 RabbitMQ 协议(AMQP)中定义的功能,作用是限制消费者在未确认消息时能预取的消息数量。当 prefetch_count=1 时,RabbitMQ 会确保:消费者必须处理完当前消息并返回确认(ACK)后,才会再给它投递下一条消息。默认值是 0,表示 “不限制预取数量”。

Work 模型的使用

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置 prefetch 来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

交换机

fanout——广播

会把接收到的消息全部转发给与其绑定的队列

在web控制台新增类型为fanout的交换机,再绑定对应的queue

image-20260104122727113

image-20260104122855661

Direct——定向

Direct Exchange 会将接收到的消息根据规则路由到指定的 Queue,因此称为定向路由。

  • 每一个 Queue 都与 Exchange 设置一个 BindingKey
  • 发布者发送消息时,指定消息的 RoutingKey
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

image-20260104123325488

在 web控制台建立 Direct 交换机并绑定queue和对应的 Routing Key![image-20260104123751370](/Users/xixiu/Library/Application Support/typora-user-images/image-20260104123751370.png)

Topic——主题

TopicExchange 与 DirectExchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以.分割。

Queue 与 Exchange 指定 BindingKey 时可以使用通配符

  • #:代指 0 个或多个单词
  • *:代指一个单词

image-20260104124326824

Header 不推荐,通过消息的headers的规则匹配,麻烦、性能不好

在Java中申明交换机和队列的关系

古法(不推荐)

fanout

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class FanoutConfiguration {

@Bean
public FanoutExchange fanoutExchange(){
// ExchangeBuilder.fanoutExchange("").build();
return new FanoutExchange("hmall.fanout2");
}

@Bean
public Queue fanoutQueue3(){
// QueueBuilder.durable("fff").build();
return new Queue("fanout.queue3");
}

@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
}

Direct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public Queue directQueue1(){
return new Queue("direct.queue1");
}

@Bean
public Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

@Bean
public Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange){
return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");
}

...

声明队列和交换机

SpringAMQP 还提供了基于@RabbitListener注解来声明队列和交换机的方式:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】 ");
}

消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDk的objectOutputStream完成序列化。
存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

用 JSON 序列化替代 JDK 序列化

需要在 publisher 和 consumer 两端都完成以下操作:

1. 引入 jackson 依赖(SpringMVC 自带)

1
2
3
4
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

2. 配置 MessageConverter

1
2
3
4
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

消息可靠性

生产者重连

当网络波动导致客户端连接 MQ 失败时,可通过以下配置开启重连机制(需在application.yml中配置):

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

注意事项:

当网络不稳定时,重试机制能提高消息发送成功率,但 Spring AMQP 的重试是阻塞式重试—— 多次重试等待过程中,当前线程会被阻塞,可能影响业务性能。

  • 若对业务性能有要求,建议禁用重试机制;
  • 若必须使用,需合理配置等待时长和重试次数,也可考虑用异步线程执行消息发送代码。

生产者确认

RabbitMQ 提供了Publisher ConfirmPublisher Return两种确认机制。开启确认机制后,MQ 成功收到消息会返回确认信息给生产者,返回结果分为以下情况:

  • 消息投递到了 MQ,但路由失败:此时会通过 PublisherReturn 返回路由异常原因,然后返回 ACK 告知 “投递成功”;
  • 临时消息投递到了 MQ,并且入队成功:返回 ACK 告知 “投递成功”。
  • 持久化消息投递到了MQ,并且入队完成持久化,返回ACK告知 “投递成功”
  • 其他情况返回NACK,告知投递失败

image-20260104174745446

SpringAMQP 实现生产者确认

1. 在 publisher 微服务的application.yml中添加配置:
1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制

配置说明:

publisher-confirm-type有三种模式可选:

  • none:关闭 confirm 机制;
  • simple:同步阻塞等待 MQ 的回执消息;
  • correlated:MQ 异步回调方式返回回执消息。

2.在消费者端配置接受消息的回调函数通过实现ApplicationContextAwaresetApplicationContext方法,配置RabbitTemplate的Callback方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置return回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.debug("收到消息的return callback, exchange:{}, key:{}, msg:{}, code:{}, text:{}",
returned.getExchange(), returned.getRoutingKey(), returned.getMessage(),
returned.getReplyCode(), returned.getReplyText());
}
});
}
}

该配置类实现了ApplicationContextAware接口,用于获取 Spring 容器中的RabbitTemplate,并配置Publisher Return 的回调逻辑

  • 当消息投递到 MQ 但路由失败时,RabbitMQ 会触发returnedMessage方法;
  • 回调中可以获取到失败相关的信息(交换机、路由键、消息内容、错误码、错误描述),便于日志记录或后续的失败重试处理。

3.在发消息时配置收到确认消息的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void testConfirmCallback() {
// 1.创建CorrelationData(用于关联消息与回调)
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
// 2.添加ConfirmCallback回调
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败", ex);
}

@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回执");
if(result.isAck()){
// 消息发送成功(MQ已确认收到)
log.debug("消息发送成功,收到ack");
}else{
// 消息发送失败(MQ未收到)
log.error("消息发送失败,收到nack,原因:{}", result.getReason());
}
}
});
rabbitTemplate.convertAndSend("xx.direct","red","hello",cd);
}
SpringAMQP 中生产者消息确认的返回值情况
  • 消息投递到 MQ 但路由失败:触发 return 返回路由异常原因,最终返回ACK
  • 临时消息投递到 MQ 且入队成功:返回ACK
  • 持久消息投递到 MQ 且入队完成持久化:返回ACK
  • 其他情况(如 MQ 未收到消息):返回NACK,告知投递失败。
生产者确认消息的处理建议
  1. 生产者确认会增加网络和系统资源开销,尽量避免使用
  2. 若必须使用:大多数情况无需开启 Publisher-Return 机制(路由失败通常是业务配置问题,需提前规避);
    • 一般是网络波动导致请求失败,如果是别的导致失败,则可以开启生产者确认机制。此时,发送到 MQ 的消息会返回回执,此时可以根据 ACK 还是 NACK 判断消息是否成功,以及是否重试
  3. 对 NACK 消息:可有限次数重试,若仍失败则记录异常消息(便于后续排查)

MQ的可靠性

在队列创建时指定durable=false情况下,或者消息未开启持久化时,RabbitMQ中接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ岩机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

数据持久化

RabbitMQ实现数据持久化包括3个方面:

  • 交换机持久化(默认持久化)
  • 队列持久化(默认持久化)
    • 消息持久化 (默认不持久化,Spring 的rabbitTemplate默认是持久化)

手动用构建者模式创建 Message 可以构建并发送不持久化的消息,只有到内存不足时,RabbitMQ 会将非持久化消息 “换出” 到磁盘临时存储,但这不是 “持久化”,只是内存溢出的兜底策略。

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列。

惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
  • 在3.12版本后,所有队列都是LazyQueue模式,无法更改。

消费者确认

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(ConsumerAcknowledgement)。当消费者

处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack.

    • 当业务出现异常时,根据异常判断返回不同结果:

      • 如果是业务异常,会自动返回nack,会一直重发消息

      • 如果是消息处理或校验异常(MessageConversionException),自动返回reject,拒绝消息,消息直接被删除

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto #确认机制

失败重试机制

当消费者处理消息异常时,消息会无限重新入队并再次投递,导致 MQ 压力飙升。可通过 Spring 的本地重试机制替代 MQ 的无限 requeue:

配置(在 consumer 的application.yml中添加):

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初始失败等待时长(1秒)
multiplier: 1 # 等待时长倍数,下次等待时长 = multiplier * 上次时长
max-attempts: 3 # 最大重试次数
stateless: true # true=无状态;若业务包含事务,改为false

失败消息处理策略

开启重试模式后,若重试次数耗尽消息仍失败,需通过MessageRecoverer接口处理,它有 3 种实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后直接 reject,丢弃消息(默认方式);
  • ImmediateRequeueMessageRecoverer:重试耗尽后返回 nack,消息重新入队;
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定交换机)。

image-20260104193233879

配置RepublishMessageRecoverer的 Bean 代码

1
2
3
4
5
6
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
// 创建RepublishMessageRecoverer,用于将重试耗尽的失败消息投递到指定交换机
// 自定义错误交换机、路由键
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error.routingKey");
}

消费者如何保证消息一定被消费?

  • 开启消费者确认机制为auto,由spring确认消息处理成功后返回ack,异常时返回nack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

业务幂等性

消息可以会由于某种原因(网络波动等)多次消费同一个消息,为了区分每个消息是否是幂等的,需要采取一些方案。

唯一消息 ID 方案(基于消息 ID 去重)

方案逻辑:
  1. 每条消息生成唯一 ID,随消息一起投递;
  2. 消费者处理业务成功后,将消息 ID 存入数据库;
  3. 再次收到消息时,查询数据库判断 ID 是否存在,存在则判定为重复消息,放弃处理。
对应的消息转换器配置:
1
2
3
4
5
6
7
8
@Bean
public MessageConverter messageConverter(){
// 1. 定义Jackson消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2. 开启自动创建消息ID(用于识别不同消息,支持重复消息判断)
jjmc.setCreateMessageIds(true);
return jjmc;
}

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的业务为例:我们要在支付后修改订单状态为已支付,应该在
修改订单状态前先查询订单状态,判断状态是否是未支付。只有未支付订单才需要修改,其它状态不做处理。

比如利用乐观锁机制,当状态满足于某个条件才修改数据库。

保证支付与交易服务订单状态一致性的方案

1. 核心同步逻辑

支付成功后,支付服务通过 MQ 消息通知交易服务同步订单状态。

2. 保障消息可靠性

  • 消息层:开启生产者确认、消费者确认、失败重试,同时开启 MQ 持久化(交换机、队列、消息均持久化),避免消息丢失;
  • 业务层:交易服务更新订单状态时做幂等判断(如基于订单 ID 或消息 ID),防止重复消费导致状态异常。

3. 兜底方案(MQ 通知失败时)

在交易服务设置定时任务,定期查询订单的支付状态(如调用支付服务接口),兜底同步状态,确保最终一致性。

延迟消息

生产者发送的消息在指定时间后消费者才收到。对应的任务是延时任务

通过定时任务也可以做,但是定时任务时间间隔短,会对数据库造成很大压力;间隔长,时效性差,无法精准 总体来说浪费资源。

死信交换机

当队列中的消息满足以下任一条件时,会成为死信(dead letter)

  • 消费者用basic.reject/basic.nack声明消费失败,且requeue参数为false
  • 消息是过期消息(达到队列 / 消息自身的 TTL),超时未被消费;
  • 队列消息堆积满,最早的消息被溢出。

若队列通过dead-letter-exchange属性指定了某个交换机,该队列的死信会被投递到这个交换机中,这个交换机即为死信交换机(DLX)

image-20260104214109047

额 ,通过设置消息指定时间未被消费而放进死信交换机作为一种延时任务的实现是人类能想出来的正常方法吗。死信交换机的目的是让消息没被正确消费后让开发者来兜底处理,并不是让你搞个弯子做延时任务用的。

这里提一下死信交换机和 RepublishMessageRecoverer(失败消息重投指定交换机)的区别

维度 死信交换机(DLX) RepublishMessageRecoverer
触发场景 消息满足「死信条件」时触发:1. 消息被消费者reject/nack且未重新入队;2. 消息过期(TTL);3. 队列达到最大长度被溢出。 仅在消费者本地重试耗尽后消息仍失败时触发(是 Spring AMQP 的本地重试机制的后续处理)。
设计目标 处理队列层面的 “异常消息”(如无法消费、过期、队列满),是 RabbitMQ 原生的消息兜底机制。 处理消费者业务逻辑执行失败的消息(重试后仍失败),是 Spring AMQP 封装的业务异常消息转储方案。
依赖关系 依赖队列的死信配置(需给队列绑定死信交换机),是 RabbitMQ 的队列级特性。 依赖 Spring AMQP 的MessageRecoverer接口,是 Spring 框架对 RabbitMQ 的扩展能力。
消息来源 来自绑定了死信配置的 “原队列”,消息是原队列中无法正常处理的消息。 来自消费者本地重试耗尽的失败消息,消息是消费者业务处理失败的消息。
使用场景 适用于「队列层面的异常」:如消息被拒绝、过期、队列满等。 适用于「消费者业务层面的异常」:如业务逻辑抛出异常、重试后仍无法处理。

延迟消息插件

安装

查看RabbitMQ 插件目录对应的数据卷

1
docker volume inspect mq-plugins

启用延时消息交换机插件

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • docker exec -it mq:进入名为mq的 RabbitMQ 容器;

  • rabbitmq-plugins enable:启用 RabbitMQ 插件;

  • rabbitmq_delayed_message_exchange:是 RabbitMQ 的延时消息交换机插件,启用后可创建x-delayed-message类型的交换机,实现延时消息功能。

定义延迟交换机的方式

1. 通过 Bean 注入

1
2
3
4
5
6
7
8
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 交换机名称
.delayed() // 开启延迟功能(依赖插件)
.durable(true) // 持久化
.build();
}

2.通过注解

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"), // 延迟队列
exchange = @Exchange(name = "delay.direct", delayed = "true"), // 绑定延迟交换机
key = "delay" // 路由键
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送延迟消息的代码示例(通过x-delay消息头设置过期时间)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
void testPublisherDelayMessage() {
// 1. 创建消息内容
String message = "hello, delayed message";
// 2. 发送消息,通过消息后置处理器添加x-delay头(设置延迟5秒)
rabbitTemplate.convertAndSend(
"delay.direct", // 延迟交换机名称
"delay", // 路由键
message, // 消息内容
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟属性:setDelay的参数单位是毫秒(这里设置5000=5秒)
message.getMessageProperties().setDelay(5000);
return message;
}
}
);
}

基于MQ 的延时任务只适用于延时时间比较短的场景

下面举一个具体的业务场景

超时取消订单

image-20260104222910088

直接发 30 分钟延时消息的痛点:

  1. 并发高时 MQ 压力大:大量订单的 30 分钟延时消息同时堆积在 MQ 中,占用 MQ 存储和资源;
  2. 资源浪费:多数订单 1 分钟内就支付了,但消息仍需在 MQ 中等待 30 分钟,无效占用资源。

优化方案:分段延时(阶梯式检测)

将 “30 分钟一次检测” 拆分为多段短延时检测,比如按 10s → 10s → 15s → 30s → 1min → 2min → ... → 累计30分钟 的阶梯间隔发送延时消息,每次检测订单状态:

  • 若订单已支付:直接忽略,无需后续检测;
  • 若订单未支付:继续发送下一段延时消息,直到累计时间到 30 分钟,再执行 “取消订单” 操作。

优先级队列

优先级越大的消息越优先被消费

image-20260105151652055

发消息

1
2
3
4
5
6
7
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message -> {
message.getMessageProperties().setPriority(1);
return message;
});
}

收消息

1
2
3
4
5
@RabbitListener(queues = {QUEUE_PRIORITY})
public void processMessagePriority(String dataString, Message message, Channel channel) throws IOException {
log.info("[priority]" + dataString);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

集群略 应该不考



新ICP备2025018290号-1
本站总访问量