万字长文书写RabbitMQ最全见解!以后再也不用到处去搜索了呀!

典型应用场景

1、跨系统的异步通信 人民银行二代支付系统,使用重量级消息队列 IBM MQ,异步,解耦,削峰都有体现。

2、应用内的同步变成异步 秒杀:自己发送给自己

3、基于Pub/Sub模型实现的事件驱动 放款失败通知、提货通知、购买碎屏保 系统间同步数据 摒弃ELT(比如全量 同步商户数据); 摒弃API(比如定时增量获取用户、获取产品,变成增量广播)。

4、利用RabbitMQ实现事务的最终一致性

基本介绍

AMQP协议

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应 用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户 端/中间件同产品、不同的开发语言等条件的限制。AMQP的实现有:RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、 ØMQ、Zyre等。

RabbitMQ的特性

RabbitMQ使用Erlang语言编写,使用Mnesia数据库存储消息。

(1)可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

(2)灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功 能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在 一起,也通过插件机制实现自己的 Exchange 。

(3)消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

(4)高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下 队列仍然可用。

(5)多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 AMQP、STOMP、MQTT 等等。

(6)多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby、PHP、C#、 JavaScript 等等。

(7)管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群 中的节点。

(8)插件机制(Plugin System) RabbitMQ提供了许多插件,以实现从多方面扩展,当然也可以编写自己的插件

工作模型

概念 解释
Broker 即RabbitMQ的实体服务器。提供一种传输服务,维护一条从生产者到消费者的传输线路, 保证消息数据能按照指定的方式传输。
Exchange 消息交换机。指定消息按照什么规则路由到哪个队列Queue
Queue 消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
Binding 绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
Routing Key 绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
Vhost 虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有 一组Exchange、Queue和Binding。
Producer 消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
Consumer 消息消费者。消息的接收者,一般是独立的程序。
Connection Producer 和 Consumer 与Broker之间的TCP长连接。
Channel 消息通道,也称信道。在客户端的每个连接里可以建立多个Channel,每个Channel代表一 个会话任务。在RabbitMQ Java Client API中,channel上定义了大量的编程接口。

三种主要的交换机

Direct Exchange 直连交换机

定义:直连类型的交换机与一个队列绑定时,需要指定一个明确的binding key。

路由规则:发送消息到直连类型的交换机时,只有routing key跟binding key完全匹配时,绑定的队列才能收到消息。

例如:

  1. // 只有队列1能收到消息
  2. channel.basicPublish('MY_DIRECT_EXCHANGE', 'key1', null, msg.getBytes())

Topic Exchange 主题交换机

定义:主题类型的交换机与一个队列绑定时,可以指定按模式匹配的routing key。 通配符有两个,*代表匹配一个单词。#代表匹配零个或者多个单词。单词与单词之间用 . 隔开。

路由规则:发送消息到主题类型的交换机时,routing key符合binding key的模式时,绑定的队列才能收到消息。

例如:

  1. // 只有队列1能收到消息
  2. channel.basicPublish('MY_TOPIC_EXCHANGE', 'sh.abc', null, msg.getBytes());
  3. // 队列2和队列3能收到消息
  4. channel.basicPublish('MY_TOPIC_EXCHANGE', 'bj.book', null, msg.getBytes());
  5. // 只有队列4能收到消息
  6. channel.basicPublish('MY_TOPIC_EXCHANGE', 'abc.def.food', null, msg.getBytes());

Fanout Exchange 广播交换机

定义:广播类型的交换机与一个队列绑定时,不需要指定binding key。

路由规则:当消息发送到广播类型的交换机时,不需要指定routing key,所有与之绑定的队列都能收到消息。

例如:

  1. // 3个队列都会收到消息
  2. channel.basicPublish('MY_FANOUT_EXCHANGE', '', null, msg.getBytes());

Java API 编程

创建Maven工程,pom.xml引入依赖

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>4.1.0</version>
  5. </dependency>

生产者

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class MyProducer {
  5. private final static String QUEUE_NAME = 'ORIGIN_QUEUE';
  6. public static void main(String[] args) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. // 连接IP
  9. factory.setHost('127.0.0.1');
  10. // 连接端口
  11. factory.setPort(5672);
  12. // 虚拟机
  13. factory.setVirtualHost('/');
  14. // 用户
  15. factory.setUsername('guest');
  16. factory.setPassword('guest');
  17. // 建立连接
  18. Connection conn = factory.newConnection();
  19. // 创建消息通道
  20. Channel channel = conn.createChannel();
  21. String msg = 'Hello world, Rabbit MQ';
  22. // 声明队列
  23. // String queue, boolean durable, boolean exclusive, boolean autoDelete,
  24. Map<String, Object> arguments
  25. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  26. // 发送消息(发送到默认交换机AMQP Default,Direct)
  27. // 如果有一个队列名称跟Routing Key相等,那么消息会路由到这个队列
  28. // String exchange, String routingKey, BasicProperties props, byte[] body
  29. channel.basicPublish('', QUEUE_NAME, null, msg.getBytes());
  30. channel.close();
  31. conn.close();
  32. }
  33. }

消费者

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. public class MyConsumer {
  4. private final static String QUEUE_NAME = 'ORIGIN_QUEUE';
  5. public static void main(String[] args) throws Exception {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. // 连接IP
  8. factory.setHost('127.0.0.1');
  9. // 默认监听端口
  10. factory.setPort(5672);
  11. // 虚拟机
  12. factory.setVirtualHost('/');
  13. // 设置访问的用户
  14. factory.setUsername('guest');
  15. factory.setPassword('guest');
  16. // 建立连接
  17. Connection conn = factory.newConnection();
  18. // 创建消息通道
  19. Channel channel = conn.createChannel();
  20. // 声明队列
  21. // String queue, boolean durable, boolean exclusive, boolean autoDelete,
  22. Map<String, Object> arguments
  23. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  24. System.out.println(' Waiting for message....');
  25. // 创建消费者
  26. Consumer consumer = new DefaultConsumer(channel) {
  27. @Override
  28. public void handleDelivery(String consumerTag, Envelope envelope,
  29. AMQP.BasicProperties properties, byte[] body) throws IOException {
  30. String msg = new String(body, 'UTF-8');
  31. System.out.println('Received message : '' + msg + ''');
  32. }
  33. };
  34. // 开始获取消息
  35. // String queue, boolean autoAck, Consumer callback
  36. channel.basicConsume(QUEUE_NAME, true, consumer);
  37. }
  38. }

参数说明

声明交换机的参数

String type:交换机的类型,direct, topic, fanout中的一种。

boolean durable:是否持久化,代表交换机在服务器重启后是否还存在。

声明队列的参数

boolean durable:是否持久化,代表队列在服务器重启后是否还存在。

boolean exclusive:是否排他性队列。排他性队列只能在声明它的Connection中使用,连接断开时自动删除。

boolean autoDelete:是否自动删除。如果为true,至少有一个消费者连接到这个队列,之后所有与这个队列连接 的消费者都断开时,队列会自动删除。

Map arguments:队列的其他属性,例如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority。

消息属性BasicProperties

消息的全部属性有14个,以下列举了一些主要的参数:

参数 释义
Map<String,Object> headers 消息的其他自定义参数
Integer deliveryMode 2持久化,其他:瞬态
Integer priority 消息的优先级
String correlationId 关联ID,方便RPC相应与请求关联
String replyTo 回调队列
String expiration TTL,消息过期时间,单位毫秒

1、TTL(Time To Live)

a、消息的过期时间

有两种设置方式:

通过队列属性设置消息过期时间:

  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put('x-message-ttl',6000);
  3. channel.queueDeclare('TEST_TTL_QUEUE', false, false, false, argss);

设置单条消息的过期时间:

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2. .deliveryMode(2) // 持久化消息
  3. .contentEncoding('UTF-8')
  4. .expiration('10000') // TTL
  5. .build();
  6. channel.basicPublish('', 'TEST_TTL_QUEUE', properties, msg.getBytes());

b、队列的过期时间

  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put('x-message-ttl',6000);
  3. channel.queueDeclare('TEST_TTL_QUEUE', false, false, false, argss);

2、死信队列

有三种情况消息会进入DLX(Dead Letter Exchange)死信交换机。

1、(NACK || Reject ) && requeue == false

2、消息过期

3、队列达到最大长度(先入队的消息会被发送到DLX)

可以设置一个死信队列(Dead Letter Queue)与DLX绑定,即可以存储Dead Letter,消费者可以监听这个队列取走消息。

  1. Map<String,Object> arguments = new HashMap<String,Object>();
  2. arguments.put('x-dead-letter-exchange','DLX_EXCHANGE');
  3. // 指定了这个队列的死信交换机
  4. channel.queueDeclare('TEST_DLX_QUEUE', false, false, false, arguments);
  5. // 声明死信交换机
  6. channel.exchangeDeclare('DLX_EXCHANGE','topic', false, false, false, null);
  7. // 声明死信队列
  8. channel.queueDeclare('DLX_QUEUE', false, false, false, null);
  9. // 绑定
  10. channel.queueBind('DLX_QUEUE','DLX_EXCHANGE','#');

3、优先级队列

设置一个队列的最大优先级:

  1. Map<String, Object> argss = new HashMap<String, Object>();
  2. argss.put('x-max-priority',10); // 队列最大优先级
  3. channel.queueDeclare('ORIGIN_QUEUE', false, false, false, argss);

发送消息时指定消息当前的优先级:

  1. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  2. .priority(5) // 消息优先级
  3. .build();
  4. channel.basicPublish('', 'ORIGIN_QUEUE', properties, msg.getBytes());

优先级高的消息可以优先被消费,但是:只有消息堆积(消息的发送速度大于消费者的消费速度)的情况下优先级才有意义。

4、延迟队列

RabbitMQ本身不支持延迟队列。可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定, 到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。 另一种方式是使用rabbitmq-delayed-message-exchange插件。 当然,将需要发送的信息保存在数据库,使用任务调度系统扫描然后发送也是可以实现的。

5、RPC

RabbitMQ实现RPC的原理:服务端处理消息后,把响应消息发送到一个响应队列,客户端再从响应队列取到结果。

其中的问题:Client收到消息后,怎么知道应答消息是回复哪一条消息的?所以必须有一个唯一ID来关联,就是correlationId。

6、服务端流控(Flow Control)

RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警 告并阻塞所有连接(Connections)。可以通过修改 rabbitmq.config 文件来调整内存阈值,默认值是 0.4,如下 所示: [{rabbit, [{vm_memory_high_watermark, 0.4}]}].

默认情况,如果剩余磁盘空间在 1GB 以下,RabbitMQ 主动阻塞所有的生产者。这个阈值也是可调的。

注意队列长度只在消息堆积的情况下有意义,而且会删除先入队的消息,不能实现服务端限流。

7、消费端限流

在AutoACK为false的情况下,如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前,不进行消费新的消息。

  1. channel.basicQos(2); // 如果超过2条消息没有发送ACK,当前消费者不再接受队列消息
  2. channel.basicConsume(QUEUE_NAME, false, consumer);

UI管理界面的使用

管理插件提供了更简单的管理方式。

启用管理插件

Windows启用管理插件

  1. cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin
  2. rabbitmq-plugins.bat enable rabbitmq_management

Linux启用管理插件

  1. cd /usr/lib/rabbitmq/bin
  2. ./rabbitmq-plugins enable rabbitmq_management

管理界面访问端口

默认端口是15672,默认用户guest,密码guest。guest用户默认只能在本机访问。

Linux 创建RabbitMQ用户

例如创建用户admin,密码admin,授权访问所有的Vhost

  1. firewall-cmd --permanent --add-port=15672/tcp
  2. firewall-cmd --reload
  3. rabbitmqctl add_user admin admin
  4. rabbitmqctl set_user_tags admin administrator
  5. rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'

Spring配置方式集成RabbitMQ

步骤:

1、创建Maven工程,pom.xml引入依赖

2、src/main/resouces目录,创建rabbitMQ.xml

3、配置applicationContext.xml

4、src/main/resouces目录,log4j.properties

5、编写生产者

6、编写4个消费者

7、编写单元测试类

RabbitMQ可靠性投递

1. RabbitMQ 可靠性投递与高可用架构

在 RabbitMQ 里面提供了很多保证消息可靠投递的机制,这个也是 RabbitMQ 的一 个特性。 我们在讲可靠性投递的时候,必须要明确一个问题,因为效率与可靠性是无法兼得 的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。所以如果是一 些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。 比如发送通知或者记录日志的这种场景,如果用户没有收到通知,不会造成业务影 响,只要再次发送就可以了。

在我们使用 RabbitMQ 收发消息的时候,有几个主要环节:

1 代表消息从生产者发送到 Broker

生产者把消息发到 Broker 之后,怎么知道自己的消息有没有被 Broker 成功接收?

2 代表消息从 Exchange 路由到 Queue Exchange 是一个绑定列表,如果消息没有办法路由到正确的队列,会发生什么 事情?应该怎么处理?

3 代表消息在 Queue 中存储队列是一个独立运行的服务,有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,那么消息要一直存储在队列里面。如果队列出了问题,消息肯定会丢失。怎么保证消息在队列稳定地存储呢?

4 代表消费者订阅 Queue 并消费消息 队列的特性是什么?FIFO。队列里面的消息是一条一条的投递的,也就是说,只有上一条消息被消费者接收以后,才能把这一条消息从数据库删掉,继续投递下一条消息。那么问题来了,Broker 怎么知道消费者已经接收了消息呢?

消息发送到 RabbitMQ 服务器

第一个环节是生产者发送消息到 Broker。可能因为网络或者 Broker 的问题导致消息发送失败,生产者不能确定 Broker 有没有正确的接收。

RabbitMQ 里面提供了两种机制服务端确认机制,也就是在生产者发送消息给 RabbitMQ 的服务端的时候,服务端会通过某种方式返回一个应答,只要生产者收到了 这个应答,就知道消息发送成功了。

第一种是 Transaction(事务)模式,第二种 Confirm(确认)模式

Transaction(事务)模式 

我们通过一个 channel.txSelect()的方法把信道设置成事务模式,然后就可以发布消 息给 RabbitMQ 了,如果 channel.txCommit();的方法调用成功,就说明事务提交成功, 则消息一定到达了 RabbitMQ 中。 如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时 候我们便可以将其捕获,进而通过执行 channel.txRollback()方法来实现事务回滚。 

在事务模式里面,只有收到了服务端的 Commit-OK 的指令,才能提交成功。所以 可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消 息没有发送完毕,不能发送下一条消息,它会榨干 RabbitMQ 服务器的性能。所以不建 议大家在生产环境使用。

Spring Boot 中的设置

rabbitTemplate.setChannelTransacted(true);

那么有没有其他可以保证消息被 Broker 接收,但是又不大量消耗性能的方式呢?这 个就是第二种模式,叫做确认(Confirm)模式。

Confirm(确认)模式

Confirm(确认)模式 确认模式有三种,一种是普通确认模式。

在生产者这边通过调用 channel.confirmSelect()方法将信道设置为 Confirm 模式, 然后发送消息。一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认 (Basic.Ack)给生产者,也就是调用 channel.waitForConfirms()返回 true,这样生产者就知道消息被服务端接收了。

这种发送 1 条确认 1 条的方式消息还不是太高,所以我们还有一种批量确认的方式。 批量确认 ,就是在开启 Confirm 模式后 , 先发送一批消息 。 只要channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息都被服务端接收了。

批量确认的方式比单条确认的方式效率要高,但是也有两个问题,第一个就是批量 的数量的确定。对于不同的业务,到底发送多少条消息确认一次?数量太少,效率提升 不上去。数量多的话,又会带来另一个问题,比如我们发 1000 条消息才确认一次,如果 前面 999 条消息都被服务端接收了,如果第 1000 条消息被拒绝了,那么前面所有的消息都要重发。

有没有一种方式,可以一边发送一边确认的呢?这个就是异步确认模式。

异步确认模式需要添加一个 ConfirmListener,并且用一个 SortedSet 来维护没有被确认的消息。 Confirm 模式是在 Channel 上开启的,因为 RabbitTemplate 对 Channel 进行了封装,叫做 ConfimrCallback。

  1. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  2. @Override
  3. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  4. if (!ack) {
  5. System.out.println('发送消息失败:' + cause);
  6. throw new RuntimeException('发送异常:' + cause);
  7. }
  8. }
  9. });

消息从交换机路由到队列

第二个环节就是消息从交换机路由到队列。在什么情况下,消息会无法路由到正确 的队列?可能因为路由键错误,或者队列不存在。 我们有两种方式处理无法路由的消息,一种就是让服务端重发给生产者,一种是让 交换机路由到另一个备份的交换机。 消息回发的方式:使用 mandatory 参数和 ReturnListener(在 Spring AMQP 中是 ReturnCallback)。

  1. rabbitTemplate.setMandatory(true);
  2. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
  3. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey){
  4. System.out.println('回发的消息:');
  5. System.out.println('replyCode: '+replyCode);
  6. System.out.println('replyText: '+replyText);
  7. System.out.println('exchange: '+exchange);
  8. System.out.println('routingKey: '+routingKey);
  9. }
  10. });

消息路由到备份交换机的方式:在创建交换机的时候,从属性中指定备份交换机。

  1. Map<String,Object> arguments = new HashMap<String,Object>();
  2. arguments.put('alternate-exchange','ALTERNATE_EXCHANGE'); // 指定交换机的备份交换机
  3. channel.exchangeDeclare('TEST_EXCHANGE','topic', false, false, false, arguments);

(注意区别,队列可以指定死信交换机;交换机可以指定备份交换机)

消息在队列存储

第三个环节是消息在队列存储,如果没有消费者的话,队列一直存在在数据库中。 如果 RabbitMQ 的服务或者硬件发生故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都 保存到磁盘。

解决方案:

1、队列持久化 RabbitConfig.java

  1. @Bean('GpQueue')
  2. public Queue GpQueue() {
  3. // queueName, durable, exclusive, autoDelete, Properties
  4. return new Queue('GP_TEST_QUEUE', true, false, false, new HashMap<>());
  5. }

2、交换机持久化

  1. @Bean('GpExchange')
  2. public DirectExchange exchange() {
  3. // exchangeName, durable, exclusive, autoDelete, Properties
  4. return new DirectExchange('GP_TEST_EXCHANGE', true, false, new HashMap<>());
  5. }

3、消息持久化

  1. MessageProperties messageProperties = new MessageProperties();
  2. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  3. Message message = new Message('持久化消息'.getBytes(), messageProperties);
  4. rabbitTemplate.send('GP_TEST_EXCHANGE', 'gupao.test',

4、集群

如果只有一个 RabbitMQ 的节点,即使交换机、队列、消息做了持久化,如果服务 崩溃或者硬件发生故障,RabbitMQ 的服务一样是不可用的,所以为了提高 MQ 服务的 可用性,保障消息的传输,我们需要有多个 RabbitMQ 的节点.

消息投递到消费者

如果消费者收到消息后没来得及处理即发生异常,或者处理过程中发生异常,会导致4失败。服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递 这条消息给其他消费者。 RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送 ACK 给服务端。 没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消 费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。 消费者在订阅队列时,可以指定 autoAck参数,当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从队列中移去消息。 如何设置手动 ACK? SimpleRabbitListenerContainer 或者SimpleRabbitListenerContainerFactory

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

application.properties

  1. spring.rabbitmq.listener.direct.acknowledge-mode=manual
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

注意这三个值的区别:

NONE:自动 ACK

MANUAL: 手动 ACK

AUTO:如果方法未抛出异常,则发送 ack。

当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝, 且不重新入队。当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会 发送 ACK。其他的异常,则消息会被拒绝,且 requeue = true 会重新入队。 在 Spring Boot 中,消费者又怎么调用 ACK,或者说怎么获得 Channel 参数呢? 引入 com.rabbitmq.client.Channel。

  1. public class SecondConsumer {
  2. @RabbitHandler
  3. public void process(String msgContent,Channel channel, Message message) throws IOException {
  4. System.out.println('Second Queue received msg : ' + msgContent );
  5. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  6. }
  7. }

如果消息无法处理或者消费失败,也有两种拒绝的方式,Basic.Reject()拒绝单条, Basic.Nack()批量拒绝。如果 requeue 参数设置为 true,可以把这条消息重新存入队列, 以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环 重复消费的情况。可以投递到新的队列中,或者只打印异常日志)。

思考:服务端收到了 ACK 或者 NACK,生产者会知道吗?即使消费者没有接收到消 息,或者消费时出现异常,生产者也是完全不知情的。

例如,我们寄出去一个快递,是怎么知道收件人有没有收到的?因为有物流跟踪和 签收反馈,所以寄件人可以知道。 在没有用上电话的年代,我们寄出去一封信,是怎么知道收信人有没有收到信件? 只有收到回信,才知道寄出的信被收到了。

所以,这个是生产者最终确定消费者有没有消费成功的两种方式:

1) 消费者收到消息,处理完毕后,调用生产者的 API(思考:是否破坏解耦?)

2) 消费者收到消息,处理完毕后,发送一条响应消息给生产者

消费者回调

1) 调用生产者 API 例如:提单系统给其他系统发送了碎屏保消息后,其他系统必须在处理完消息 后调用提单系统提供的 API,来修改提单系统中数据的状态。只要 API 没有被调用, 数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。

2) 发送响应消息给生产者 例如:商业银行与人民银行二代支付通信,无论是人行收到了商业银行的消息, 还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)

补偿机制

如果生产者的 API 就是没有被调用,也没有收到消费者的响应消息,怎么办? 不要着急,可能是消费者处理时间太长或者网络超时。 生产者与消费者之间应该约定一个超时时间,比如 5 分钟,对于超出这个时间没有 得到响应的消息,可以设置一个定时重发的机制,但要发送间隔和控制次数,比如每隔 2 分钟发送一次,最多重发 3 次,否则会造成消息堆积。 重发可以通过消息落库+定时任务来实现。 重发,是否发送一模一样的消息?

参考: ATM 机上运行的系统叫 C 端(ATMC),前置系统叫 P 端(ATMC),它接收 ATMC 的消息,再转发给卡系统或者核心系统。

1)如果客户存款,没有收到核心系统的应答消息幂等性,不知道有没有记账成功,最多发送 5次存款确认报文,因为已经吞钞了,所以要保证成功;

2)如果客户取款,ATMC 未得到应答时,最多发送 5 次存款冲正报文。因为没有吐钞,所以要保证失败。

消息幂等性

如果消费者每一次接收生产者的消息都成功了,只是在响应或者调用 API 的时候出 了问题,会不会出现消息的重复处理?例如:存款 100 元,ATM 重发了 5 次,核心系统 一共处理了 6 次,余额增加了 600 元。

所以,为了避免相同消息的重复处理,必须要采取一定的措施。RabbitMQ 服务端 是没有这种控制的(同一批的消息有个递增的 DeliveryTag),它不知道你是不是就要把 一条消息发送两次,只能在消费端控制。

如何避免消息的重复消费? 消息出现重复可能会有两个原因:

1、生产者的问题,环节①重复发送消息,比如在开启了 Confirm 模式但未收到 确认,消费者重复投递。

2、环节④出了问题,由于消费者未发送 ACK 或者其他原因,消息重复投递。

3、生产者代码或者网络问题。 对于重复发送的消息,可以对每一条消息生成一个唯一的业务 ID,通过日志或者消 息落库来做重复控制。

参考:银行的重账控制环节。

最终一致

如果确实是消费者宕机了,或者代码出现了 BUG 导致无法正常消费,在我们尝试多次重发以后,消息最终也没有得到处理,怎么办? 例如存款的场景,客户的钱已经被吞了,但是余额没有增加,这个时候银行出现了长款,应该怎么处理?如果客户没有主动通知银行,这个问题是怎么发现的?银行最终 怎么把这个账务做平? 在我们的金融系统中,都会有双方对账或者多方对账的操作,通常是在一天的业务 结束之后,第二天营业之前。我们会约定一个标准,比如 ATM 跟核心系统对账,肯定是 以核心系统为准。ATMC 获取到核心的对账文件,然后解析,登记成数据,然后跟自己 记录的流水比较,找出核心有 ATM 没有,或者 ATM 有核心没有,或者两边都有但是金 额不一致的数据。 对账之后,我们再手工平账。比如取款记了账但是没吐钞的,做一笔冲正。存款吞了钞没记账的,要么把钱退给客户,要么补一笔账。

消息的顺序性

消息的顺序性指的是消费者消费消息的顺序跟生产者生产消息的顺序是一致的。 例如:商户信息同步到其他系统,有三个业务操作:

1、新增门店

2、绑定产品

3、 激活门店,这种情况下消息消费顺序不能颠倒(门店不存在时无法绑定产品和激活)。

又比如:1、发表微博;2、发表评论;3、删除微博。顺序不能颠倒。 在 RabbitMQ 中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是 不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用的队列)。

除非负载的场景,不要用多个消费者消费消息。

集群与高可用

为什么要做集群?

集群主要用于实现高可用与负载均衡。

高可用:如果集群中的某些 MQ 服务器不可用,客户端还可以连接到其他 MQ 服务器。

负载均衡:在高并发的场景下,单台 MQ 服务器能处理的消息有限,可以分发给多台MQ服务器。

RabbitMQ 有两种集群模式:普通集群模式和镜像队列模式。

RabbitMQ 如何支持集群

应用做集群,需要面对数据同步和通信的问题。因为 Erlang 天生具备分布式的特性,所以 RabbitMQ 天然支持集群,不需要通过引入ZK或者数据库来实现数据同步。

RabbitMQ 通过/var/lib/rabbitmq/.erlang.cookie 来验证身份,需要在所有节点上 保持一致。

RabbitMQ 的节点类型

集群有两种节点类型,一种是磁盘节点(Disc Node),一种是内存节点(RAM Node)。

磁盘节点:将元数据(包括队列名字属性、交换机的类型名字属性、绑定、vhost) 放在磁盘中。

内存节点:将元数据放在内存中。

PS:内存节点会将磁盘节点的地址存放在磁盘(不然重启后就没有办法同步数据了)。 如果是持久化的消息,会同时存放在内存和磁盘。

集群中至少需要一个磁盘节点用来持久化元数据,否则全部内存节点崩溃时,就无 从同步元数据。未指定类型的情况下,默认为磁盘节点。

我们一般把应用连接到内存节点(读写快),磁盘节点用来备份。

集群通过 25672 端口两两通信,需要开放防火墙的端口。 需要注意的是,RabbitMQ 集群无法搭建在广域网上,除非使用 federation 或者 shovel 等插件(没这个必要,在同一个机房做集群)。

集群的配置步骤:

1、配置 hosts

2、同步 erlang.cookie

3、加入集群(join cluster)

普通集群

普通集群模式下,不同的节点之间只会相互同步元数据。

疑问:为什么不直接把队列的内容(消息)在所有节点上复制一份?

主要是出于存储和同步数据的网络开销的考虑,如果所有节点都存储相同的数据, 就无法达到线性地增加性能和存储容量的目的(堆机器)。

假如生产者连接的是节点 3,要将消息通过交换机 A 路由到队列 1,最终消息还是会 转发到节点 1 上存储,因为队列 1 的内容只在节点 1 上。

同理,如果消费者连接是节点 2,要从队列 1 上拉取消息,消息会从节点 1 转发到 节点 2。其它节点起到一个路由的作用,类似于指针。

普通集群模式不能保证队列的高可用性,因为队列内容不会复制。如果节点失效将 导致相关队列不可用,因此我们需要第二种集群模式

镜像集群

第二种集群模式叫做镜像队列。 镜像队列模式下,消息内容会在镜像节点间同步,可用性更高。不过也有一定的副 作用,系统性能会降低,节点过多的情况下同步的代价比较大。

操作方式 命令或步骤
rabbitmqctl (Windows) rabbitmqctl set_policy ha-all '^ha.' '{''ha-mode'':''all''}'
HTTP API PUT /api/policies/%2f/ha-all {'pattern':'^ha.', 'definition':{'ha-mode':'all'}}
Web UI

1、avigate to Admin > Policies > Add / update a policy

2、Name 输入:mirror_image

3、Pattern 输入:^(代表匹配所有)

4、Definition 点击 HA mode,右边输入:all

5、Add policy

高可用

集群搭建成功后,如果有多个内存节点,那么生产者和消费者应该连接到哪个内存 节点?如果在我们的代码中根据一定的策略来选择要使用的服务器,那每个地方都要修 、改,客户端的代码就会出现很多的重复,修改起来也比较麻烦。

所以需要一个负载均衡的组件(例如 HAProxy,LVS,Nignx),由负载的组件来做 路由。这个时候,只需要连接到负载组件的 IP 地址就可以了。

负载分为四层负载和七层负载。

四层负载:工作在 OSI 模型的第四层,即传输层(TCP 位于第四层),它是根据 IP 端口进行转发(LVS 支持四层负载)。RabbitMQ 是 TCP 的 5672 端口。

七层负载:工作在第七层,应用层(HTTP 位于第七层)。可以根据请求资源类型分 配到后端服务器(Nginx 支持七层负载;HAProxy 支持四层和七层负载)。

但是,如果这个负载的组件也挂了呢?客户端就无法连接到任意一台 MQ 的服务器 了。所以负载软件本身也需要做一个集群。新的问题又来了,如果有两台负载的软件, 客户端应该连哪个?

负载之上再负载?陷入死循环了。这个时候我们就要换个思路了。 我们应该需要这样一个组件:

1、 它本身有路由(负载)功能,可以监控集群中节点的状态(比如监控 HAProxy),如果某个节点出现异常或者发生故障,就把它剔除掉。

2、 为了提高可用性,它也可以部署多个服务,但是只有一个自动选举出 来的 MASTER 服务器(叫做主路由器),通过广播心跳消息实现。

3、 MASTER 服务器对外提供一个虚拟 IP,提供各种网络功能。也就是 谁抢占到 VIP,就由谁对外提供网络服务。应用端只需要连接到这一 个 IP 就行了

这个协议叫做 VRRP 协议(虚拟路由冗余协议 Virtual Router Redundancy Protocol),这个组件就是 Keepalived,它具有 Load Balance 和 High Availability 的功能。

下面我们看下用 HAProxy 和 Keepalived 如何实现 RabbitMQ 的高可用 (MySQL、Mycat、Redis 类似)。

基于 Docker 安装 HAproxy 负载+Keepalived 高可用

规划:

内存节点 1:192.168.8.40

内存节点 2:192.168.8.45

磁盘节点:192.168.8.150

VIP:192.168.8.220

1、我们规划了两个内存节点,一个磁盘节点。所有的节点之间通过镜像队列的 方式同步数据。内存节点用来给应用访问,磁盘节点用来持久化数据。

2、为了实现对两个内存节点的负载,我们安装了两个 HAProxy,监听两个 5672 和 15672 的端口。

3、安 装 两 个 Keepalived , 一 主 一 备 。 两 个 Keepalived 抢 占 一 个 VIP192.168.8.220。谁抢占到这个 VIP,应用就连接到谁,来执行对 MQ 的负载

这种情况下,我们的 Keepalived 挂了一个节点,没有影响,因为 BACKUP 会变 成 MASTER,抢占 VIP。HAProxy 挂了一个节点,没有影响,我们的 VIP 会自动路 由的可用的 HAProxy 服务。RabbitMQ 挂了一个节点,没有影响, 因为 HAProxy 会自动负载到可用的节点.

 

实践经验总结

资源管理

到底在消费者创建还是在生产者创建? 如果A项目和B 项目有相互发送和接收消息,应该创建几个vhost,几个Exchange?

交换机和队列,实际上是作为资源,由运维管理员创建的。 为什么仍然需要在代码中定义?重复创建不报错吗?

 

配置文件与命名规范

1、元数据的命名集中放在 properties 文件中,不要用硬编码。如果有多个系统, 可以配置多个 xxx_mq.properties。

2、命名体现元数据的类型

虚拟机命名: XXX_VHOST

交换机命名:XXX_EXCHANGE

队列命名:_QUEUE

3、命名体现数据来源和去向

例如:销售系统发往产品系统的交换机:SALE_TO_PRODUCT_EXCHANGE。做到 见名知义,不用去查文档(当然注释是必不可少的)。

调用封装

在项目中可以对 Template 做进一步封装,简化消息的发送。 例如:如果交换机、路由键是固定的,封装之后就只需要一个参数:消息内容。 另外,如果想要平滑地迁移不同的 MQ(如果有这种需求的话),也可以再做一层 简单的封装。

  1. YKTSendMsg(){
  2. JmsTemplate.send(destination,msg);
  3. }
  4. 这时,如果要把 ActiveMQ 替换为 RabbitMQ,只需要修改:
  5. GpSendMsg(){
  6. RabbitTemplate.send(exchange,routingKey,msg);
  7. }

信息落库+定时任务

将需要发送的消息保存在数据库中,可以实现消息的可追溯和重复控制,需要配合 定时任务来实现。

1) 将需要发送的消息登记在消息表中。

2) 定时任务一分钟或半分钟扫描一次,将未发送的消息发送到 MQ 服务器,并且 修改状态为已发送。

3) 如果需要重发消息,将指定消息的状态修改为未发送即可。 副作用:降低效率,浪费存储空间。

生产环境运维监控

虽然 RabbitMQ 提供了一个简单的管理界面,但是如果对于系统性能、高可用和其 他参数有一些定制化的监控需求的话,我们就需要通过其他方式来实现监控了。

主要关注:磁盘、内存、连接数

日志追踪

RabbitMQ 可以通过 Firehose 功能来记录消息流入流出的情况,用于调试,排错。 它是通过创建一个 TOPIC 类型的交换机(amq.rabbitmq.trace),把生产者发送给 Broker 的消息或者 Broker 发送给消费者的消息发到这个默认的交换机上面来实现的。

另外 RabbitMQ 也提供了一个 Firehose 的 GUI 版本,就是 Tracing 插件。 启用 Tracing 插件后管理界面右侧选项卡会多一个 Tracing,可以添加相应的策略。 RabbitMQ还提供了其他的插件来增强功能。

如何减少连接数

在发送大批量消息的情况下,创建和释放连接依然有不小的开销。我们可以跟接收 方约定批量消息的格式,比如支持 JSON 数组的格式,通过合并消息内容,可以减少生 产者/消费者与 Broker 的连接。

比如:活动过后,要全范围下线产品,通过 Excel 导入模板,通常有几万到几十万条 解绑数据,合并发送的效率更高。 建议单条消息不要超过 4M(4096KB),一次发送的消息数需要合理地控制。

  1. msgContent = {
  2. 'type':'add', 'num':3, 'detail': [
  3. { 'merchName':'黄金手机店', 'address':'黄金路 999 号' ] },
  4. { 'merchName':'银星手机店', 'address':'银星路 168 号' ] },
  5. { 'merchName':'青铜手机店', 'address':'青铜路 73 号' ] }
  6. ]
  7. }

RabbitMQ高频面试题总汇:

1、 消息队列的作用与使用场景?

异步:批量数据异步处理。例:批量上传文件,比如代发代扣文件。

削峰:高负载任务负载均衡。例:电商秒杀抢购。

解耦:串行任务并行化。例:退货流程解耦。

广播:基于Pub/Sub实现一对多通信。

2、 Channel 和 vhost 的作用是什么?

Channel:减少 TCP 资源的消耗。也是最重要的编程接口。

Vhost:提高硬件资源利用率,实现资源隔离。

3、 RabbitMQ 的消息有哪些路由方式?适合在什么业务场景使用?

Direct、Topic、Fanout

4、交换机与队列、队列与消费者的绑定关系是什么样的?

多个消费者监听一个队列时(比如一个服务部署多个实例),消息会重复消费吗?

多对多;

轮询(平均分发)

5、 无法被路由的消息,去了哪里?

直接丢弃。可用备份交换机(alternate-exchange)接收。

如果没有任何设置,无法路由的消息会被直接丢弃。

无法路由的情况:Routing key不正确。

解决方案:

1、使用 mandatory=true 配合 ReturnListener,实现消息回发。

2、声明交换机时,指定备份交换机。

6、 消息在什么时候会变成 Dead Letter(死信)?

1、消息被拒绝并且没有设置重新入队:(NACK || Reject ) && requeue == false

2、消息过期(消息或者队列的TTL设置)

3、消息堆积,并且队列达到最大长度,先入队的消息会变成DL。 可以在声明队列时,指定一个Dead Letter Exchange,来实现Dead Letter的转发

7、 如果一个项目要从多个服务器接收消息,怎么做? 如果一个项目要发送消息到多个服务器,怎么做?

定义多个 ConnectionFactory,注入到消费者监听类/Temaplate

8、 RabbitMQ 如何实现延迟队列?

利用TTL(队列的消息存活时间或消息存活时间),加上死信交换机。 当然还有一种方式就是先保存消息到数据库,用调度器扫描发送(时间不够精准)。

9、哪些情况会导致消息丢失?怎么解决? 哪些情况会导致消息重复?怎么解决?

从消息发送的整个流程来分析。

10、 一个队列最多可以存放多少条消息?

由硬件决定。

11、 可以用队列的 x-max-length 最大消息数来实现限流吗?例如秒杀场景。

不能,因为会删除先入队的消息,不公平。

12、 如何提高消息的消费速率?

创建多个消费者。

13、 AmqpTemplate 和 RabbitTemplate 的区别?

Spring AMQP 是 Spring 整合 AMQP 的一个抽象。Spring-Rabbit 是一个实 现。

14、 如何保证消息的可靠性投递?

1、确保投递到服务端Broker

2、保证正确地路由

3、消息的持久化存储

4、消费者应答ACK

5、消费者回调

6、补偿机制

15、 Spring AMQP 中消息怎么封装?用什么转换?

Message,MessageConvertor

16、 如何保证消息的顺序性?

一个队列只有一个消费者

17、 RabbitMQ 的集群节点类型?

磁盘节点和内存节点

18、 如何保证 RabbitMQ 的高可用?

HAProxy(LVS)+Keepalived

19、 大量消息堆积怎么办?

1) 重启(不是开玩笑的)

2) 多创建几个消费者同时消

3) 直接清空队列,重发消

20、 RabbitMQ的集群模式和集群节点类型?

集群模式有两种:

普通模式:默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于 其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消 息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进 行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消 息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总 在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。 如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的 现象。

镜像模式:把需要的队列做成镜像队列,存在与多个节点属于RabbitMQ的HA方案。该模式解决了普通模式中的 问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。 该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的 网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。

节点分为两种:

1)内存(RAM):保存状态到内存(但持久化的队列和消息还是会保存到磁盘)。

2)磁盘节点:保存状态到内存和磁盘。 一个集群中至少需要需要一个磁盘节点。

21、多个消费者监听一个队列时,消息如何分发?

1、Round-Robin(轮询) 默认的策略,消费者轮流、平均地收到消息。

2、Fair dispatch (公平分发)

如果要实现根据消费者的处理能力来分发消息,给空闲的消费者发送更多消息,可以用basicQos(int prefetch_count)来设置。prefetch_count的含义:当消费者有多少条消息没有响应ACK时,不再给这个消费者发送 消息。

22、如何在服务端和消费端做限流?

网关/接入层:其他限流方式。

服务端(Broker):配置文件中内存和磁盘的控制;队列长度无法实现限流。

消费端:prefetch_count。

23、如何保证消息的顺序性?

比如新增门店、绑定产品、激活门店这种对消息顺序要求严格的场景。 一个队列只有一个消费者的情况下才能保证顺序。 否则只能通过全局ID来实现。

1、每条消息有一个msgId,关联的消息拥有同一个parentMsgId。

2、可以在消费端实现前一条消息未消费,不处理下一条消息;也可以在生产端实现前一条消息未处理完毕,不发 布下一条消息。

24、消息幂等性

首先,Broker本身没有消息重复过滤的机制。

1、生产者方面,可以对每条消息生成一个msgId,以此控制消息重复投递。

  1. // 消息属性
  2. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
  3. .messageId(String.valueOf(UUID.randomUUID()))
  4. .build();
  5. // 发送消息
  6. channel.basicPublish('', QUEUE_NAME, properties, msg.getBytes())

2、消费者方面,消息体(比如json报文)中必须携带一个业务ID,比如银行的交易流水号,消费者可以根据业务 ID去重,避免重复消费。

25、多个消费者监听一个队列时,消息如何分发?

1、Round-Robin(轮询) 默认的策略,消费者轮流、平均地收到消息。

2、Fair dispatch (公平分发)

如果要实现根据消费者的处理能力来分发消息,给空闲的消费者发送更多消息,可以用basicQos(int prefetch_count)来设置。prefetch_count的含义:当消费者有多少条消息没有响应ACK时,不再给这个消费者发送 消息。

(0)

相关推荐