了不起的消息队列(二):啊哈!RabbitMQ

背景

在本系列的前一篇博文中,笔者对消息队列的概述、特点等进行讲解,然后对消息队列使用场景进行分析,最后对市面上比较常见的消息队列产品进行技术对比。

经过上一篇博客介绍,相信大家对消息队列已经有了一个大致了解。RabbitMQ 是 MQ 产品的典型代表,是一款基于 AMQP 协议可复用的企业消息系统。业务上,可以实现服务提供者和消费者之间的数据解耦,提供高可用性的消息传输机制,在实际生产中应用相当广泛。本文意在介绍 RabbitMQ 的基本原理,包括 RabbitMQ 基本框架、概念、通信过程等,介绍一下 RabbitMQ 安装教程,最后介绍一下 RabbitMQ 在项目中实际应用场景。

RabbitMQ 介绍

RabbitMQ 是采用 Erlang 语言实现的 AMQP[1] 协议的消息中间件,最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 发展到今天,被越来越多的人认可,这和它在可靠性、可用性、扩展性、功能丰富等方面的卓越表现是分不开的。RabbitMQ 实现了 AQMP 协议,AQMP 协议定义了消息路由规则和方式。生产端通过路由规则发送消息到不同 queue,消费端根据 queue 名称消费消息。此外 RabbitMQ 是向消费端推送消息,订阅关系和消费状态保存在服务端。

相关概念

通常我们谈到消息队列时会有三个概念:生产者、队列、消费者,RabbitMQ 在这个基本概念之上,多做了一层抽象,在生产者和队列之间,加入了交换器(Exchange)。这样生产者和队列就没有直接联系,转而变成发生产者把消息给交换器,交换器根据调度策略再把消息再给队列。因此在 RabbitMQ 的消息传递模型中,他的核心思想是生产者永远不会将任何消息直接发送到队列上,甚至不知道消息是否被传递到任何队列。生产者向 Exchanges 发送消息。 Exchanges 负责生产者消息的接收,将消息推送到队列。Exchanges 通过 exchange type 指定的类型明确要如何处理消息,比如附加到特定队列或者所有队列,或者将消息丢弃。

RabbitMQ 相关概念

  • Message:消息,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher:消息生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange 有 4 种类型: direct(默认)、 fanout、topic 和 headers ,不同类型的 Exchange 转发消息的策略有所区别。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
  • Connection:网络连接,比如一个 TCP 连接。
  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的 TCP 连接内的虚拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer:消息消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing_key)将会确定消息投递到那个队列(queue)。交换机不存储消息,如果没有 Queue Binding 到 Exchange 的话,它会直接丢弃掉 Publisher 发送过来的消息;在启用 ack 模式后,交换机找不到队列会返回错误。Exchange 有 4 种类型: direct(默认)、fanout、topic 和 headers ,不同类型的 Exchange 转发消息的策略有所区别。

1、Direct Exchange

Direct Exchange:直接交换器,Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的交换机模式,根据 ROUTING_KEY 全文匹配去寻找队列。其工作方式类似于单播,Exchange 会将消息发送完全匹配 ROUTING_KEY 的 Queue。

Direct 模式,可以使用 RabbitMQ 自带的 Exchange:default Exchange 。所以不需要将 Exchange 进行任何绑定(binding)操作 。消息传递时,ROUTING_KEY 必须完全匹配,才会被队列接收,否则该消息会被抛弃。

Direct Exchange

2、Fanout Exchange

Fanout Exchange:扇形交换器,所有发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定 (Binding) 的所有 Queue 上。

Fanout 模式,Fanout Exchange 不需要处理 ROUTING_KEY。只需要简单的将队列绑定到 Exchange 上,这样发送到 Exchange 的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

Fanout Exchange

3、Topic Exchange

Topic Exchange:主题交换器,工作方式类似于组播,Exchange 会将消息转发和 ROUTING_KEY 匹配模式相同的所有队列。

Topic 模式,Exchange 将 ROUTING_KEY 和某 Topic 进行模糊匹配。此时队列需要绑定一个 Topic,可以使用通配符进行模糊匹配,符号 “#” 匹配一个或多个词,符号 “” 匹配不多不少一个词。因此 “log.#” 能够匹配到“log.info.oa”,但是“log.” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。

Topic Exchange

4、Headers Exchange

Headers Exchange:首部交换器和扇形交换器都不需要路由键 ROUTING_KEY,首部交换器和主题交换机有点相似,但是不同于主题交换机的路由是基于路由键,头交换机的路由值基于消息的 header 数据,主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。

Headers 模式,Headers 是一个键值对,可以定义成 Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式 all 和 any。这两种方式是在接收端必须要用键值 “x-mactch” 来定义。all 代表定义的多个键值对都要满足,而 any 则代码只要满足一个就可以了。fanout,direct,topic exchange 的 ROUTING_KEY 都需要要字符串形式的,而 headers exchange 则没有这个要求,因为键值对的值可以是任何类型。

Headers Exchange

RabbitMQ 安装以及环境配置

本文统一使用软件包管理器的方式安装 RabbitMQ,减少环境变量的配置,更加方便快捷。RabbitMQ 官网也有详细的安装教程,感兴趣的同学,可以参考下。Downloading and Installing RabbitMQ

Linux 安装 RabbitMQ

CentOS7 中使用 yum 安装 RabbitMQ 的方法,RabbitMQ 是采用 Erlang 语言实现,因此安装 RabbitMQ 前,需要先安装 Erlang。直接用 yum install erlang 安装的版本是 R16B-03.18.el7,不满足要求,为此,RabbitMQ 贴心提供了一个 erlang.repo,将以下内容添加到 /etc/yum.repos.d/rabbitmq-erlang.repo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 1. 将 erlang 新版本源添加到 /etc/yum.repos.d/rabbitmq-erlang.repo
# In /etc/yum.repos.d/rabbitmq_erlang.repo
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=1
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
# PackageCloud's repository key and RabbitMQ package signing key
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

# 2. 安装 erlang
$ yum install erlang -y

# 3. 将 RabbitMQ 新版本源添加到 /etc/yum.repos.d/rabbitmq-erlang.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.8.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1

# 4. 安装 RabbitMQ
$ yum install rabbitmq-server -y

Mac 安装 RabbitMQ

Mac 中使用 brew 安装 RabbitMQ 的方法

1
2
# 1. 使用 RabbitMQ 安装
$ brew install rabbitmq

Windows 安装 RabbitMQ

Windows 中使用 choco 安装 RabbitMQ 的方法

1
2
# 1. 使用 RabbitMQ 安装
$ choco install rabbitmq

开启 rabbitmq_management 以便通过浏览器访问控制台

1
2
3
4
5
6
7
8
# 1. 开启 rabbitmq_management 以便通过浏览器访问控制台
$ rabbitmq-plugins enable rabbitmq_management

# 2. 将 RabbitMQ 加入开机自启动
$ systemctl enable rabbitmq-server.service

# 3. 立即启动 RabbitMQ
$ systemctl start rabbitmq-server.service

管理控制台的地址默认为 http://server-name:15672 (将其中 server-name 替换为你自己的 ip 地址)。RabbitMQ 默认有个 guest 账号,密码也为 guest,但是如果不是从 RabbitMQ 所在机器上试图用这个账号登陆管理控制台的话,会报错误:“User can only log in via localhost”。RabbitMQ 3.0 开始禁止使用 guest/guest 权限通过除 localhost 外的访问。因此,我们需要添加一个超级管理员。

1
2
3
4
5
# 1. 添加一个账户吧,用户名 admin 密码 admin
$ rabbitmqctl add_user admin admin

# 2. 该用户赋予超级管理员的角色
$ rabbitmqctl set_user_tags admin administrator

在 RabbitMQ 中,用户角色可分为五类:

  • 超级管理员(administrator):可登陆管理控制台,可查看所有的信息,并且可以对用户、策略(policy) 进行操作。
  • 监控者(monitoring):可登陆管理控制台,同时可以查看 RabbitMQ 节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
  • 策略制定者(policymaker):可登陆管理控制台,同时可以对 policy 进行管理,但无法查看节点的相关信息。
  • 普通管理者(management):仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
  • 其他:无法登陆管理控制台,通常就是普通的生产者和消费者。

RabbitMQ 的六种消息队列教程

RabbitMQ 的六种消息队列

1、 导入 RabbitMQ 的客户端依赖

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>

2、 定义创建连接工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConnectionUtils {

public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务器地址
factory.setHost("localhost");
// 设置服务器端口
factory.setPort(5672);
// 设置服务器账号
factory.setUsername("guest");
// 设置服务器密码
factory.setPassword("guest");
// 通过连接工厂获取连接
return factory.newConnection();
}
}

“Hello World!”

创建一个生产者项目用来向消息队列发送数据,创建一个消费者项目用来从消息队列里接收数据,消费者需要注册到指定到 MQ 队列中,如下图所示:

RabbitMQ 简单模式

1、Publisher :消息生产者 Publisher 向交换器(AMQP default)发送消息,交换器类型为 Direct Exchange,消息传递时,ROUTING_KEY 必须完全匹配,才会被队列接收,否则该消息会被抛弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Send {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 发送消息(exchange,routingKey,其他属性,消息正文),交换器名称为默认(AMQP default),交换器类型为direct
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}

2、Consumer :消息消费者 Consumer 从消息队列 hello 中取得消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建队列(队列名称,非持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
}

Work queues

Work queues 工作队列也称为任务队列,主要思想是避免立即执行资源密集型的任务。将需要执行的任务放到消息队列中,等待资源空闲时消费者从消息队列中取出消息并逐个执行。使用任务队列的优点之一是能够轻松并行化工作,如果我们正在积压工作,我们可以增加更多的消费者,这样就可以轻松扩展。工作队列适用于很多场景,一般的使用方式也都是采用任务队列,如下图所示:

RabbitMQ Work queues 工作队列模式

1、Publisher :消息生产者 Publisher 向交换器(AMQP default)发送消息,交换器类型为 Direct Exchange。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Send {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建队列(队列名称,持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 遍历发送 100 条消息
for (int i = 0; i < 100; i++) {
// 消息内容
String message = String.format("Hello World! %s", i);
// 发送消息(exchange,routingKey,其他属性,消息正文),交换器名称为默认(AMQP default),交换器类型为direct
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

2、Consumer:消息消费者 Consumer 从消息队列 hello 中取得消息,通过 Thread.sleep() 函数来伪造资源密集型的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明创建队列(队列名称,持久化,非独占,不自动删除队列,空参数)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
doWork();
} finally {
System.out.println(" [x] Done");
// 手动消息确认(若忘记手动消息确认,当您的客户端退出时,消息将被重新发送(可能看起来像是随机重新发送),但是RabbitMQ将消耗越来越多的内存,因为它将无法释放任何未确认的消息。)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}

@SneakyThrows
private static void doWork() {
// Thread.sleep()函数来伪造资源密集型的任务
Thread.sleep(1000);
}
}

3、Consumer:当任务积压时,我们只需要启动多个消费者,这样就可以轻松扩展,完成消费。

注意:默认情况下,RabbitMQ 将每个消息依次发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

Publish/Subscribe

Publish/Subscribe 发布订阅模式,将消息广播发送给所有消费者。这里以日志系统为例,假设生产者程序将消息发送给两个消费者,其中一个消费者负责将日志输出到控制台,另外一个消费者负责将日志写入到磁盘。Publish/Subscribe 发布订阅模式中交换器类型为 Fanout Exchange。扇形交换器,所有发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定 (Binding) 的所有 Queue 上,如下图所示:

RabbitMQ Publish/Subscribe 发布订阅模式

1、Publisher :消息生产者 Publisher 向交换器 logs 发送消息,交换器类型为 Fanout Exchange。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建交换器(交换器名称,交换器类型,非持久化),交换器类型为扇形交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 消息内容
String message = "Hello World!";
// 发送消息(exchange,routingKey,其他属性,消息正文),当ExchangeType为fanout时,将忽略routingKey参数
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}

2、Consumer:消息消费者 Consumer 从 channel 中获取一个随机的非持久化自动删除队列(客户端退出就自动删除),然 绑定消息队列和 exchange。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReceiveLogs {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明创建交换器(交换器名称,交换器类型,非持久化),交换器类型为扇形交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();
// 绑定消息队列和exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

3、Consumer:当启动多个消费者,每个消费者会创建队列并绑定至交换器 logs 上,消息生产者向交换器 logs 发送消息,交换器 logs 将消息转发到与该 Exchange 绑定 (Binding) 的所有 Queue 上。

注意:消费者必须先绑定到 exchange 上,然后生产者再发送消息,否则 exchange 无法将消息路由到任何队列。

Routing

Routing 路由,进行有选择的接收消息,可以订阅某个消息队列的子集。例如,我们将只能将严重错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。Routing 路由模式中交换器类型为 Fanout Exchange。直接交换器,Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的交换机模式,根据 ROUTING_KEY 全文匹配去寻找队列。其工作方式类似于单播,Exchange 会将消息发送完全匹配 ROUTING_KEY 的 Queue,如下图所示:

RabbitMQ Routing 路由模式

1、Publisher :消息生产者 Publisher 向交换器 direct_logs 发送消息,交换器类型为 Direct Exchange。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建交换器(交换器名称,交换器类型),交换器类型为直接交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);

// 向指定exchange发送消息,路由key为 info
channel.basicPublish(EXCHANGE_NAME, "info", null, "info message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息,路由key为 warning
channel.basicPublish(EXCHANGE_NAME, "warning", null, "warning message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息,路由key为 error
channel.basicPublish(EXCHANGE_NAME, "error", null, "error message".getBytes(StandardCharsets.UTF_8));
}
}
}

2、Consumer:消息消费者 Consumer 从 channel 中获取一个随机的非持久化自动删除队列(客户端退出就自动删除),绑定消息队列、exchange、路由 key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明创建交换器(交换器名称,交换器类型),交换器类型为直接交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();
// 绑定消息队列、exchange、路由key为error
channel.queueBind(queueName, EXCHANGE_NAME, "error");
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

3、Consumer:消费者可以为一个队列绑定多个 routingKey。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明创建交换器(交换器名称,交换器类型),交换器类型为直接交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();
// 绑定消息队列、exchange、路由key为info
channel.queueBind(queueName, EXCHANGE_NAME, "info");
// 绑定消息队列、exchange、路由key为error
channel.queueBind(queueName, EXCHANGE_NAME, "error");
// 绑定消息队列、exchange、路由key为warning
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

Topics

Topics 主题,基于主题交换的策略接收消息,基于 Topics 的方式可以让消息队列的使用更加灵活,为消息的发送和订阅提供更加细粒度的控制。例如,在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还要根据发出日志的源订阅日志。Topics 主题模式中交换器类型为 Topic Exchange。主题交换器,工作方式类似于组播,Exchange 会将消息转发和 ROUTING_KEY 匹配模式相同的所有队列,如下图所示:

RabbitMQ Topics 主题模式

1、Publisher :消息生产者 Publisher 向交换器 topic_logs 发送消息,交换器类型为 Topic Exchange。首先需要指定 exchange 的类型为 topic,在生产者发送消息时设置 routingKey 为一个符号表达式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
// 获取连接并创建通道
try (Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel()) {
// 声明创建交换器(交换器名称,交换器类型),交换器类型为主题交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 向指定exchange发送消息, routingKey 为 s.info.l, 订阅的消费者需要指定主题routingKey为 *.info.* 或者 #.info.#
channel.basicPublish(EXCHANGE_NAME, "s.info.l", null, "info message".getBytes(StandardCharsets.UTF_8));

// 向指定exchange发送消息, routingKey 为 lazy.test.one, 订阅的消费者需要指定主题routingKey为 lazy.#
channel.basicPublish(EXCHANGE_NAME, "lazy.test.one", null, "lazy message".getBytes(StandardCharsets.UTF_8));
}
}
}

2、Consumer:消息消费者 Consumer 从 channel 中获取一个随机的非持久化自动删除队列(客户端退出就自动删除),通过符号表达式绑定消息队列、exchange、路由 key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ReceiveLogsTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明创建交换器(交换器名称,交换器类型),交换器类型为主题交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 获取队列,得到一个随机名称(非持久化的自动删除队列)
String queueName = channel.queueDeclare().getQueue();
// 绑定消息队列和exchange
channel.queueBind(queueName, EXCHANGE_NAME, "*.info.*");
// 消费者接收到队列投递的回调(consumerTag 服务端生成的消费者标识,delivery投递)
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息内容并输出
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 开启消费者监听(队列名称,自动确认消息,投递消息回调,取消消息回调),此时 RabbitMQ 将消息推送至消费者(若想用拉的方式,则可以使用channel.basicGet()方法)
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

RPC

RPC 远程过程调用,RabbitMQ 也支持这种同步调用的特性,调用之后等待调用结果返回。该模式使用率较少,在实际项目中应用场景较小。如下图所示:

RabbitMQ RPC 远程过程调用模式

客户端通过 RabbitMQ 的 RPC 调用服务端,等待服务端返回结果,示例程序如下:

1、为了说明如何使用 RPC 服务,我们将创建一个简单的客户端类。它将公开一个名为 call 的方法,该方法 发送 RPC 请求并阻塞,直到收到答案为止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class RPCClient implements AutoCloseable {

private Connection connection;
private Channel channel;
private String RPC_QUEUE_NAME = "rpc_queue";

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connection = connectionFactory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] args) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}

public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();

String replayQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replayQueueName)
.build();
channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, message.getBytes(StandardCharsets.UTF_8));

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

String cTag = channel.basicConsume(replayQueueName, true, (consumeTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), StandardCharsets.UTF_8));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(cTag);

return result;
}

@Override
public void close() throws Exception {
connection.close();
}
}

2、由于我们没有值得分配的耗时任务,因此我们将创建一个虚拟 RPC 服务,该服务返回斐波那契数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) {
if (n == 0) {
return n;
}
if (n == 1) {
return n;
}
return fib(n - 1) + fib(n - 2);
}

public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");

try (Connection connection = connectionFactory.newConnection()) {
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

channel.basicQos(1);

System.out.println(" [x] Awaiting PRC request");

Object monitor = new Object();
DeliverCallback deliverCallback = (consumerTag, message) -> {
AMQP.BasicProperties replyProperties = new AMQP.BasicProperties
.Builder()
.correlationId(message.getProperties().getCorrelationId())
.build();

String response = "";

try {
String theMessage = new String(message.getBody(), StandardCharsets.UTF_8);
int n = Integer.parseInt(theMessage);
System.out.println(" [.] fib(" + n + ")");
response += fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
} finally {
channel.basicPublish("", message.getProperties().getReplyTo(), replyProperties, response.getBytes(StandardCharsets.UTF_8));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
synchronized (monitor) {
monitor.notify();
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumeTag -> {
}));
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (TimeoutException | IOException e) {
e.printStackTrace();
}
}
}

Spring Boot 集成 RabbitMQ

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

简单使用

1、配置 Pom 包,主要是添加 spring-boot-starter-amqp 的支持

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置 RabbitMQ 的安装地址、端口以及账户信息

1
2
3
4
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、队列配置

1
2
3
4
5
6
7
8
@Configuration
public class RabbitConfig {

@Bean
public Queue Queue() {
return new Queue("hello");
}
}

4、发送者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class HelloSender {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("hello", context);
}
}

5、接收者

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}

6、测试

1
2
3
4
5
6
7
8
9
10
11
@SpringBootTest
public class RabbitMqHelloTest {

@Autowired
private HelloSender helloSender;

@Test
public void hello() throws Exception {
helloSender.send();
}
}

高级使用

对象的支持

Spring Boot 以及完美的支持对象的发送和接收,不需要格外的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 发送者
public void send(User user) {
System.out.println("Sender object: " + user.toString());
this.rabbitTemplate.convertAndSend("object", user);
// Sender object: User{name='neo', pass='123456'}
}

...

// 接收者
@RabbitHandler(queues = "object")
public void process(User user) {
System.out.println("Receiver object : " + user);
// Receiver object : User{name='neo', pass='123456'}
}

消息持久化

在使用 RabbitMQ 时,我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失。在使用 RabbitMQ 时,我们可以通过消息持久化来解决服务器因异常崩溃而造成的消息丢失。其中,RabblitMQ 的持久化分为三个部分:交换器(Exchange)的持久化、队列(Queue)的持久化、消息(Message)的持久化。

1、设置队列、交换器持久化,防止在 RabbitMQ 出现异常情况(重启,宕机)时,Exchange、Queue 丢失,影响后续的消息写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class DirectRabbitConfig {

@Bean
public Queue queue() {
// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
return new Queue("hello", true, false, false, null);
}

@Bean
public DirectExchange directExchange() {
// (交换器名称,设置 durable=true 持久化交换器,非独占,不自动删除队列,空参数)
return new DirectExchange("directExchange", true, false, null);
}

@Bean
public Binding bindingExchangeMessage(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("hello");
}
}

或者通过消费者 @RabbitListener 注解的方式进行持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
// 如果不存在,自动创建队列和交换器并且绑定
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "hello", durable = "true"),
exchange = @Exchange(value = "directExchange", type = "direct", durable = "true"),
key = "hello"))
public class HelloReceiver {

@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}

注意:Exchange 交换器的持久化,在声明时指定 durable => true,若 durable=false 非持久化,在 RabbitMQ 出现异常情况(重启,宕机)时,该 Exchange 会丢失,会影响后续的消息写入该 Exchange;Queue 队列的持久化,在声明时指定 durable => true,若 durable=false 非持久化,在 RabbitMQ 出现异常情况(重启,宕机)时,队列丢失,队列丢失导致队列与 Exchange 绑定关系丢失,会影响后续的消息路由给服务器中的队列。

3、发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class HelloSender {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String context = "hello " + new Date();
System.out.println("Sender : " + context);
// 通过阅读源代码可以发现 new MessageProperties() 持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的
Message message = MessageBuilder.withBody(context.getBytes()).build();
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
this.rabbitTemplate.convertAndSend("hello", message);
}
}

注意:Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。Message 消息的持久化,在投递时指定 delivery_mode => 2(1 是非持久化),RabbitTemplate 它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。

4、注意事项

(1)理论上可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能。因为写入磁盘的速度比写入内存的速度慢得不止一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

(2)将交换器、队列、消息都设置了持久化之后仍然不能百分之百保证数据不丢失,因为当持久化的消息正确存入 RabbitMQ 之后,还需要一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。如果在这段时间内 RabbitMQ 服务节点发生了宕机、重启等异常情况,消息还没来得及落盘,那么这些消息将会丢失。

(3)单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

ACK 确认机制

默认情况下消息消费者是自动 ack (确认)消息的,自动确认会在消息发送给消费者后立即确认,这样存在丢失消息的可能。

1、配置 RabbitMQ 的安装地址、端口、账户信息以及 ACK 确认模式

1
2
3
4
5
6
7
spring.rabbitmq.host=118.25.39.41
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
## 确认模式设置为手动签收,1、NONE:没有ack的意思,对应RabbitMQ的自动确认模式;2、MANUAL:手动模式,对应RabbitMQ的显式确认模式;AUTO:自动模式,对应RabbitMQ的显式确认模式;
## MANUAL与AUTO的关系有点类似于在Spring中手动提交事务与自动提交事务的区别:一个是手动发送ack;一个是在方法执行完,没有异常的情况下自动发送ack
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL

注意:AcknowledgeMode.MANUAL 模式需要人为地获取到 channel 之后调用方法向 server 发送 ack(或消费失败时的 nack)信息;AcknowledgeMode.AUTO 模式下,由 spring-rabbit 依据消息处理逻辑是否抛出异常自动发送 ack(无异常)或 nack(异常)到 server 端。

2、接收者,消息消费者手动确认消息以及消息拒绝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

@SneakyThrows
@RabbitHandler
public void process(String hello, Message message, Channel channel) {
String ackMessage = "hello";
if (ackMessage.equals(hello)) {
// 确认消息接收:第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型;为true时会将小于等于此次tag的所有消息都确认掉,如果为false则只确认当前tag的信息,可根据实际情况进行选择。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
// 消息拒绝:第一个deliveryTag参数为每条信息带有的tag值,第二个multiple参数为布尔类型,第三个requeue参数为拒绝后是否重新回到队列;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 消息拒绝:basicReject() 和 basicNack()的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}

Topic Exchange

topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key 自由的绑定不同的队列。

1、首先对 topic 规则配置,使 queueMessages 同时匹配两个队列,queueMessage 只匹配 “topic.message” 队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class TopicRabbitConfig {

final static String message = "topic.message";
final static String messages = "topic.messages";

@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}

@Bean
public TopicExchange exchange() {
return new TopicExchange("exchange");
}

@Bean
public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
public Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}

2、发送 send1 会匹配到 topic.# 和 topic.message 两个 Receiver 都可以收到消息,发送 send2 只有 topic.# 可以匹配所有只有 Receiver2 监听到消息。

1
2
3
4
5
6
7
8
9
10
11
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
}

Fanout Exchange

Fanout 就是我们熟悉的广播模式或者订阅模式,给 Fanout 交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

1、首先对 fanout 规则配置,这里使用了 A、B、C 三个队列绑定到 Fanout 交换机上面,发送端的 routing_key 写任何字符都会被忽略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

@Bean
public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
public Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}

2、绑定到 fanout 交换机上面的队列都收到了消息

1
2
3
4
5
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

运行结果:

1
2
3
4
5
Sender : hi, fanout msg 
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg

基于 RabbitMQ 实现消息延迟队列方案

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。在很多的业务场景中,延时队列可以实现很多功能,此类业务中,一般上是非实时的,需要延迟处理的,需要进行重试补偿的。

  • 订单超时关闭:在支付场景中,一般上订单在创建后 30 分钟或 1 小时内未支付的,会自动取消订单。
  • 短信或者邮件通知:在一些注册或者下单业务时,需要在 1 分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的,一般上的做法是进行异步发送。
  • 重试场景:比如消息通知,在第一次通知出现异常时,会在隔几分钟之后进行再次重试发送。

RabbitMQ 实现延时队列一般而言有两种形式:

  • 第一种方式:利用两个特性,Time To Live(TTL)、Dead Letter Exchanges(DLX),通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能
  • 第二种方式:利用 RabbitMQ 中的插件 x-delay-message 实现延迟功能

第一种方式:利用两个特性,Time To Live(TTL)、Dead Letter Exchanges(DLX)

AMQP 协议和 RabbitMQ 队列本身没有直接支持延迟队列功能,但是我们可以通过 RabbitMQ 的两个特性 TTL(Time-To-Live,存活时间)和 DLX(Dead-Letter-Exchange,死信队列交换机)来曲线实现延迟队列:

存活时间(Time-To-Live 简称 TTL)

RabbitMQ 可以通过设置队列过期时间实现延时消费或者通过设置消息过期时间实现延时消费,如果超时(两者同时设置以最先到期的时间为准),则消息变为 dead letter(死信)。

RabbitMQ 针对队列中的消息过期时间有两种方法可以设置,A:通过队列属性设置,队列中所有消息都有相同的过期时间;B:对消息进行单独设置,每条消息 TTL 可以不同。如果同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息在队列的生存时间一旦超过设置的 TTL 值,就成为 dead letter。

死信交换(Dead Letter Exchanges 简称 DLX)

设置了 TTL 的消息或队列最终会成为 Dead Letter,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定此 DLX 的队列就是死信队列。

RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可选)两个参数,如果队列内出现了 dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现 dead letter 之后将 dead letter 重新发送到指定 exchange;
x-dead-letter-routing-key:出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送。

队列出现 dead letter 的情况有:

  • 消息或者队列的 TTL 过期;

  • 队列达到最大长度;

  • 消息被消费端拒绝(basic.reject or basic.nack)并且 requeue=false。

Spring Boot 集成 RabbitMQ 实现延时队列实战

1、队列以及交换器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class RabbitConfig {

@Bean
public DirectExchange directExchange() {
// 死信队列跟交换机类型没有关系,不一定为directExchange,不影响该类型交换机的特性
return new DirectExchange("dead.letter.direct");
}

@Bean
public Queue repeatTradeQueue() {
// 用于延时消费的队列
return new Queue("repeat.trade.queue", true, false, false);
}

@Bean
public Binding repeatTradeBinding(Queue repeatTradeQueue, DirectExchange directExchange) {
// 绑定交换机并指定routing key
return BindingBuilder.bind(repeatTradeQueue).to(directExchange).with("repeat.trade.queue");
}

@Bean
public Queue deadLetterQueue() {
// 配置死信队列
Map<String, Object> args = new HashMap<>(3);
// 方式一:设置队列过期时间实现延时消费,设置队列中消息存活时间为3秒
args.put("x-message-ttl", 3000);
// 出现 dead letter 之后将 dead letter 重新发送到指定 exchange
args.put("x-dead-letter-exchange", "dead.letter.direct");
// 出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送
args.put("x-dead-letter-routing-key", "repeat.trade.queue");
return new Queue("dead.letter.queue", true, false, false, args);
}
}

2、发送者,这里发送者需要指定前面配置了过期时间的队列 dead.letter.queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class DeadLetterSender {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String context = "hello " + new Date();
System.out.println("DeadLetterSender sendTime:" + LocalDateTime.now().toString() + " message:" + context);

// 方式二:设置消息过期时间实现延时消费,设置消息存活时间为10秒,同时使用队列过期时间以及消息过期时间,则消息的过期时间以两者之间TTL较小的那个数值为准。
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
// 设置过期时间10*1000毫秒
messageProperties.setExpiration("10000");
return message;
};

// 向dead.letter.queue发送消息,10*1000毫秒后过期,形成死信
rabbitTemplate.convertAndSend("dead.letter.queue", (Object) context, messagePostProcessor);
}
}

3、接收者,消费者监听指定用于延时消费的队列 repeat.trade.queue

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "repeat.trade.queue")
public class RepeatTradeReceiver {

@RabbitHandler
public void process(String msg) {
System.out.println("RepeatTradeReceiver receiptTime:" + LocalDateTime.now().toString() + " message:" + msg);
}
}

4、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
public class RabbitMqHelloTest {

@Autowired
private DeadLetterSender deadLetterSender;

@SneakyThrows
@Test
public void hello() {
deadLetterSender.send();
// 线程休眠5s,等待死信队列中的消息过期变成死信,重新发送到 repeatTradeQueue 队列中
TimeUnit.SECONDS.sleep(5);
}
}

运行结果:

1
2
DeadLetterSender sendTime:2020-01-19T17:15:42.633 message:hello Sun Jan 19 17:15:42 CST 2020
RepeatTradeReceiver receiptTime:2020-01-19T17:15:45.672 message:hello Sun Jan 19 17:15:42 CST 2020

利用两个特性,Time To Live、Dead Letter Exchanges 两个特征实现延时队列

第二种方式:利用 RabbitMQ 中的插件 x-delay-message

延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依赖 Erlang/OPT 18.0 及以上运行环境。

实现机制:安装插件后会生成新的 Exchange 类型 x-delayed-message,该类型消息支持延迟投递机制, 接收到消息后并未立即将消息投递至目标队列中,而是存储在 mnesia(一个分布式数据系统) 表中,检测消息延迟时间,如达到可投递时间时并将其通过 x-delayed-type 类型标记的交换机类型投递至目标队列。

安装延迟插件

1、下载插件

可以通过 RabbitMQ 官网的官方插件 Community Plugins 下载相对应的 rabbitmq_delayed_message_exchange 插件,并将插件包放在 RabbitMQ 安装目录 plugins 目录下。

1
[root@VM_24_98_centos plugins]# wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

2、开启 rabbitmq_delayed_message_exchange 插件

1
[root@VM_24_98_centos plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3、查询安装的所有插件,检查 rabbitmq_delayed_message_exchange 插件是否是开启状态

1
[root@VM_24_98_centos plugins]# rabbitmq-plugins list

4、重启 RabbitMQ,使插件生效

1
[root@VM_24_98_centos ~]# systemctl restart rabbitmq-server.service

此时,通过浏览器访问控制台在交换器栏目下新增交换器多了 “x-delayed-message” 类型。

Spring Boot 集成 RabbitMQ 实现延时队列实战

1、队列以及交换器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class RabbitConfig {

@Bean
public CustomExchange directExchange() {
// 自定义的交换机类型
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange("dead.letter.direct", "x-delayed-message", true, false, args);
}


@Bean
public Queue deadLetterQueue() {
// 配置死信队列
return new Queue("dead.letter.queue", true);
}

@Bean
public Binding repeatTradeBinding(Queue repeatTradeQueue, CustomExchange directExchange) {
// 绑定交换机并指定routing key
return BindingBuilder.bind(repeatTradeQueue).to(directExchange).with("repeat.trade.queue").noargs();
}
}

2、发送者,这里发送者需要指定前面配置了过期时间的队列 dead.letter.queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class DeadLetterSender {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String context = "hello " + new Date();
System.out.println("DeadLetterSender sendTime:" + LocalDateTime.now().toString() + " message:" + context);

// 向dead.letter.queue发送消息,10*1000毫秒后过期,形成死信
rabbitTemplate.convertAndSend("dead.letter.direct", "dead.letter.queue", context, message -> {
// 设置消息过期时间实现延时消费,设置消息存活时间为10秒
message.getMessageProperties().setDelay(10000);
return message;
}
);
}
}

3、接收者,消费者监听指定用于死信队列 dead.letter.queue

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "dead.letter.queue")
public class DeadLetterReceiver {

@RabbitHandler
public void process(String msg) {
System.out.println("DeadLetterReceiver receiptTime:" + LocalDateTime.now().toString() + " message:" + msg);
}
}

4、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest
public class RabbitMqHelloTest {

@Autowired
private DeadLetterSender deadLetterSender;

@SneakyThrows
@Test
public void hello() {
deadLetterSender.send();
// 等待接收程序执行之后,再退出测试
TimeUnit.SECONDS.sleep(15);
}
}

运行结果:

1
2
DeadLetterSender sendTime:2020-01-20T11:08:24.412 message:hello Mon Jan 20 11:08:24 CST 2020
DeadLetterReceiver receiptTime:2020-01-20T11:08:34.440 message:hello Mon Jan 20 11:08:24 CST 2020

参考博文

[1]. RabbitMQ 官方文档地址
[2]. Spring Boot(八):RabbitMQ 详解
[3]. Spring Boot(十四)RabbitMQ延迟队列


注脚

[1]. AMQP:AMQP(advanced message queuing protocol)在 2003 年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。在 AMQP 中,消息路由(message routing)和 JMS 存在一些差别,在 AMQP 中增加了 Exchange 和 binding 的角色。producer 将消息发送给 Exchange,binding 决定 Exchange 的消息应该发送到那个 queue,而 consumer 直接从 queue 中消费消息。


了不起的消息队列系列


谢谢你长得那么好看,还打赏我!😘
0%