java之学习记录 7 - 2 - RabbitMQ安装及使用
怎么用RabbitMQ
- 想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK
- 查看匹配的版本:https://www.rabbitmq.com/which-erlang.html
1 RabbitMQ安装启动
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
1.1 安装
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
1.2 启动后台管理插件
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
1.3 启动RabbitMQ
[root@localhost opt]# systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service
1.4 查看进程
[root@localhost opt]# ps -ef | grep rabbitmq
1.5 测试
1. 关闭防火墙: systemctl stop firewalld
2. 浏览器输入:http://ip:15672
3. 默认账号密码:guest,guest用户默认不允许远程连接
1. 创建账号
[root@localhost opt]# rabbitmqctl add_user mzj 123456
2. 设置用户角色
[root@localhost opt]# rabbitmqctl set_user_tags mzj administrator
3. 设置用户权限
[root@localhost opt]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
4. 查看当前用户和角色
[root@localhost opt]# rabbitmqctl list_users
5. 查看当前用户和角色
[root@localhost opt]# rabbitmqctl change_password mzj 123123
管理界面介绍
- overview:概览
- connections:查看链接情况
- channels:信道(通道)情况
- Exchanges:交换机(路由)情况,默认4类7个
- Queues:消息队列情况
- Admin:管理员列表
- 端口:
- 5672:RabbitMQ提供给编程语言客户端链接的端口
- 15672:RabbitMQ管理界面的端口
- 25672:RabbitMQ集群的端口
2 RabbitMQ快速入门
2.1 依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
2.2.2 日志依赖log4j(可选项)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
2.2 创建连接
- 先创建好虚拟主机
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/*
* 专门与RabbitMQ获得连接
* */
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
factory.setHost("192.168.58.222");
factory.setPort(5672);
factory.setVirtualHost("/lagou");
factory.setUsername("mzj");
factory.setPassword("123456");
// 通过工厂获得与MQ的连接
Connection connection = factory.newConnection();
return connection;
}
public static void main(String[] args) throws Exception{
Connection connection = getConnection();
System.out.println("connection = " connection);
connection.close();
}
}
3 RabbitMQ模式
- RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
- 在线手册:https://www.rabbitmq.com/getstarted.html
- 5种消息模型,大体分为两类:
- 1和2属于点对点
- 3、4、5属于发布订阅模式(一对多)
- 点对点模式:P2P(point to point)模式包含三个角色:
- 消息队列(queue),发送者(sender),接收者(receiver)
- 每个消息发送到一个特定的队列中,接收者从中获得消息
- 队列中保留这些消息,直到他们被消费或超时
- 特点:
- 1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了
- 2. 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
- 3. 接收者成功接收消息之后需向对象应答成功(确认)
- 如果希望发送的每个消息都会被成功处理,那需要P2P
- 发布订阅模式:publish(Pub)/subscribe(Sub)
- pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者(subcriber)
- 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
- 特点:
- 1. 每个消息可以有多个订阅者
- 2. 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
- 3. 为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。
- 如果希望发送的消息被多个消费者处理,可采用本模式
3.1 简单模式
下面引用官网的一段介绍:
Introduction
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as
a post office: when you put the mail that you want posting in a post box, you can be sure
that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy,
RabbitMQ is a post box, a post office and a postman.
译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要
寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在
这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
3.1.1 生产者P
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "小马:hello,RabbitMQ!";
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明消息队列(1,2,3,4,5)
/*
* 参数1:队列的名称
* 参数2:队列中的数据是否持久化
* 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
* 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存在保存数据)
* 参数5:队列参数(没有参数为null)
* */
channel.queueDeclare("queue1",false,false,false,null);
// 向指定的队列发送消息(1,2,3,4)
/*
* 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
* 参数2:目标队列的名称
* 参数3:设置消息的属性(没有属性则为null)
* 参数4:消息内容(只接收字节数组)
* */
channel.basicPublish("","queue1",null,msg.getBytes());
System.out.println("发送:" msg);
// 释放资源
channel.close();
connection.close();
}
}
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
3.1.2 消费者C
package simplest;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消息接收者
* */
public class Recer {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
* 交付处理
* 1、收件人信息
* 2、信封(包裹上的快递标签)
* 3、协议的配置
* 4、消息
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("queue1",true,consumer);
}
}
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
3.1.3 消息确认机制ACK
- 通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
- RabbitMQ如何得知消息被消费者接收?
- 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
- 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
- ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200就是告诉我们服务器执行成功
- 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
- 不过这种回执ACK分为两种情况:
- 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
- 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
- 两种情况如何选择,需要看消息的重要性:
- 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
- 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
- 修改手动消息确认
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);
- 结果如下:
package simplest;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消息接收者,加入ACK确认机制
* */
public class RecerByACK {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
/*
* 交付处理
* 1、收件人信息
* 2、信封(包裹上的快递标签)
* 3、协议的配置
* 4、消息
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("接收 = " s);
// 手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列 false 手动消息确认
channel.basicConsume("queue1",false,consumer);
}
}
3.2 工作队列模式
- 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)
- 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
- 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)
3.2.1 生产者P
package work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明消息队列(1,2,3,4,5)
/*
* 参数1:队列的名称
* 参数2:队列中的数据是否持久化
* 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
* 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存在保存数据)
* 参数5:队列参数(没有参数为null)
* */
channel.queueDeclare("test_work_queue",false,false,false,null);
for (int i = 1; i < 100; i ){
String msg = "羊肉串-->" i;
channel.basicPublish("","test_work_queue",null,msg.getBytes());
System.out.println("新鲜出炉的:" msg);
}
// 释放资源
channel.close();
connection.close();
}
}
3.2.2 消费者1
package work;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者2
* */
public class Recer1 {
static int i = 1;// 统计吃掉羊肉串的数量
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
final Channel channel = connection.createChannel();
// queueDeclare此方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
channel.queueDeclare("test_work_queue",false,false,false,null);
channel.basicQos(1);
// 从信道中获得消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【顾客1】吃掉了 " s " ! 总共【" i "】");
// 模拟网络延迟
try {
Thread.sleep(200);
} catch (Exception e){
}
// 手动确认(收件人信息,是否同时确认多个消息)
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列 false 手动消息确认
channel.basicConsume("test_work_queue",false,consumer);
}
}
3.2.3 消费者2(同3.2.2消费者1代码相同)
只更改一处
System.out.println("【顾客2】吃掉了 " s " ! 总共【" i "】");
- 先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
- 虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
- 例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可以的,应该遵循“能者多劳”
- 效率高的多干点,效率低的少干点
- 看下面官网是如何给出解决思路的:
公平的分配
您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的worker。
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos(1);
- 能者多劳必须要配合手动的ACK机制才生效
3.2.4 面试题:避免消息堆积?
1. workqueue,多个消费者监听同一个队列
2. 接收到消息后,通过线程池,异步消费
3.3 发布订阅模式
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知
- 上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
- P生产者发送信息给X路由,X将信息转发给绑定X的队列
- X队列将信息通过信道发送给消费者,从而进行消费
- 整个过程,必须先创建路由
- 路由在生产者程序中创建
- 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
- 运行程序的顺序:
- 1. MessageSender
- 2. MessageReceiver1和MessageReceiver2
- 3. MessageSender
3.3.1 生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明路由
// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout","fanout");
//channel.queueDeclare("test_work_queue",false,false,false,null);
String msg = "Hello,everyone";
channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());
System.out.println("生产者:" msg);
// 释放资源
channel.close();
connection.close();
}
}
3.3.2 消费者1
package ps;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者1
* */
public class Recer {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
// 绑定路由(关注)
channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout","");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者1】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_fanout_queue_1",true,consumer);
}
}
3.3.3 消费者2
将3.3.2消费者1代码中的1修改为2即可,具体代码略
3.4 路由模式
- 路由会根据类型进行定向分发消息给不同的队列,如图所示
- 可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
3.4.1 生产者
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明路由
// direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_exchange_direct","direct");
//channel.queueDeclare("test_work_queue",false,false,false,null);
String msg = "用户注册,【userid=u101】";
channel.basicPublish("test_exchange_direct","insert",null,msg.getBytes());
System.out.println("用户系统:" msg);
// 释放资源
channel.close();
connection.close();
}
}
3.4.2 消费者1
package direct;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者1
* */
public class Recer1 {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
// 绑定路由(如果路由键的类型是添加、删除、修改的话,绑定到这个队列上)
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update");
channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者1】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_direct_queue_1",true,consumer);
}
}
3.4.3 消费者2
package direct;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者2
* */
public class Recer2 {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
// 绑定路由(如果路由键的类型是查询的话,绑定到这个队列2上)
channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者2】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_direct_queue_2",true,consumer);
}
}
1. 记住运行程序的顺序,先运行一次sender(创建路由器),
2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
3. 再次运行sender,发出消息
3.5 通配符模式
- 和路由模式90%是一样的。
- 唯独的区别就是路由键支持模糊匹配
- 匹配符号
- *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
- #:匹配0个或更多个词
- 看一下官网案例:
- Q1绑定了路由键 *.orange.* Q2绑定了路由键 *.*.rabbit 和 lazy.#
- 下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox # Q1
lazy.brown.fox # Q2
lazy.pink.rabbit # Q2
quick.brown.fox # 无
orange # 无
quick.orange.male.rabbit # 无
3.5.1 生产者
package topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型,持久化)
// topic:模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic","topic");
String msg = "商品降价";
channel.basicPublish("test_exchange_topic","product.price", null,msg.getBytes());
System.out.println("商品系统:" msg);
// 释放资源
channel.close();
connection.close();
}
}
3.5.2 消费者1
package topic;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者1
* */
public class Recer1 {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列(第二个参数为true:支持持久化)
channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null);
// 绑定路由(绑定用户相关的消息)
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者1】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}
3.5.3 消费者2
package topic;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者2
* */
public class Recer2 {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null);
// 绑定路由(绑定用户相关的消息)
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#");
channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者2】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_topic_queue_2",true,consumer);
}
}
4 持久化
- 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?
- 消费者的ACK确认机制,可以防止消费者丢失消息
- 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
- 想要将消息持久化,那么路由和队列都要持久化才可以
4.1 生产者
package topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import util.ConnectionUtil;
/*
* 消息生产者
* */
public class Sender {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型,持久化)
// topic:模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic","topic",true);
String msg = "商品降价";
channel.basicPublish("test_exchange_topic","product.price", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
System.out.println("商品系统:" msg);
// 释放资源
channel.close();
connection.close();
}
}
4.2 消费者
package topic;
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/*
* 消费者
* */
public class Recer1 {
public static void main(String[] args) throws Exception {
// 获得连接
Connection connection = ConnectionUtil.getConnection();
// 在连接中创建通道(信道)
Channel channel = connection.createChannel();
// 声明队列(第二个参数为true:支持持久化)
channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
// 绑定路由(绑定用户相关的消息)
channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
// 从信道中获得消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是从队列中获取的消息
String s = new String(body);
System.out.println("【消费者1】 = " s);
}
};
// 监听队列 true 自动消息确认
channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
}
}