1. 基本架构

  • publisher:生产者
  • consumer:消费者
  • exchange 个:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的 exchange、queue、消息的隔离

2. 消息模型

  • WorkQueue: 让多个消费者绑定到一个队列,共同消费队列中的消息。

  • Topic: 在绑定队列和交换机的时候指定路由键

    1
    2
    3
    4
    @Bean
    public Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange topicExchange){
        return BindingBuilder.bind(topicQueue1).to(topicExchange).with("d.*");
    }

    在发送消息时设置路由键,根据路由键路由到相应的队列

    1
    rabbitTemplate.convertAndSend("danaizio.topic","d",JSON.toJSONString(userDO));
    • d.#,匹配 d、d.1,d.1.1
    • d.*,匹配 key.1

3. 如何保证消息不丢失

3.1 丢失原因

  • 发送时丢失:
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机,queue 将消息丢失
  • consumer 接收到消息后未消费就宕机

针对这些问题,RabbitMQ 分别给出了解决方案:

  • 生产者确认机制
  • mq 持久化
  • 消费者确认机制
  • 失败重试机制

3.2 生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID,以区分不同消息,避免ack冲突。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

首先修改配置文件

1
2
3
4
5
6
7
8
9
10
11
spring:
  rabbitmq:
    host: 192.168.198.128 # 主机名
    port: 5672 # 端口
    virtual-host: learn # 虚拟主机
    username: guest # 用户名
    password: guest # 密码
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

定义ReturnCallback

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@Slf4j
public class RabbitMQConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                        returnedMessage.getReplyCode(),
                        returnedMessage.getReplyText(), 
                        returnedMessage.getExchange(), 
                        returnedMessage.getRoutingKey(), 
                        returnedMessage.getMessage());
            }
        });
    }
}

当消息成功到达交换机确没被任何一个队列接收,那么就会触发这个回调函数

定义ConfirmCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public String send2() {
    UserDO userDO = UserDO.builder()
            .id(1)
            .age(17)
            .username("张三")
            .build();
    String uuid = UUID.randomUUID().toString();
    log.info("消息唯一id为:{}",uuid);
    CorrelationData correlationData = new CorrelationData(uuid);
    correlationData.getFuture().addCallback(
            result -> {
                assert result != null;
                if (result.isAck()) {
                    log.info("消息发送成功, ID:{}", correlationData.getId());
                } else {
                    log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                }
            },
            ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    rabbitTemplate.convertAndSend("danaizio.direct","d1",JSON.toJSONString(userDO),correlationData);
    return "发送成功";
}

当消息成功到达交换机时,ack为true

3.2 消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。

  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

1
2
3
4
5
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: none # 关闭ack

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

3.3 消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力,我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

1
2
3
4
5
6
7
8
9
10
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启consumer服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

3.4 失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//设置死信交换机和死信队列
@Configuration
public class DlXConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

    //为队列绑定死信交换机和死信队列
    @Bean
    public Queue directQueue1(){
        return QueueBuilder
                .durable("direct.queue1")
                .deadLetterExchange("error.direct")
                .deadLetterRoutingKey("error")
                .ttl(2000)
                .build();
    }

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

4. 怎么避免消息重复投递和重复消费

当交换机和队列收到消息后,都会返回相应的ack,假如这个ack由于某些原因未能到达发送方,那么就会触发重试机制(仅限队列到消费者,交换机到队列的重试需要程序员自己实现)而导致相同的消息重复消费,因此需要保证消费的幂等性。
可以为消息设置唯一id,根据这个id在消费方进行幂等操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public String send2() {
        UserDO userDO = UserDO.builder()
                .id(1)
                .age(17)
                .username("张三")
                .build();
        String uuid = UUID.randomUUID().toString();
        log.info("消息唯一id为:{}",uuid);
        CorrelationData correlationData = new CorrelationData(uuid);
        correlationData.getFuture().addCallback(
                result -> {
                    assert result != null;
                    if (result.isAck()) {
                        log.info("消息发送成功, ID:{}", correlationData.getId());
                    } else {
                        log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
                    }
                },
                ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
        );
        rabbitTemplate.convertAndSend("danaizio.direct","d1",JSON.toJSONString(userDO),correlationData);
        return "发送成功";
    }

所以,重复投递不能解决,但是能防止重复消费。

5. 无法路由的消息去哪里

  • 如果 spring.rabbitmq.template.mandatory 为true,且消息为路由到任何队列,那么消息会会被返回给生产者,并触发ReturnCallback,如果为false,则直接丢弃消息
  • 如果有默认交换机,会投递到默认交换机
  • 如果已经到达队列但无法到达消费者且该队列绑定了死信队列,会将消息投递到死信队列。

6. channel

RabbitMQ在网络层有connection和channel两个概念,一个connection代表一个tcp连接,这个connection里面又有很多channel(虚拟连接),客户端于MQ之间的通信都是基于channel,避免多次建立tcp连接,避免资源浪费。

7. 消息持久化

首先,交换机和队列都可以设置持久化,这样mq重启之后,他们的元数据可以保留,
对于消息来说,持久化代表消息到达队列之后,会异步存入磁盘,所以如果mq宕机,还是会导致小部分数据丢失,因此mq后来提出了惰性队列

惰性队列

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储