java之学习记录 7 - 2 - RabbitMQ安装及使用

怎么用RabbitMQ

1 RabbitMQ安装启动

erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang

socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads

1.1 安装

[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm

[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm

[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

1.2 启动后台管理插件

[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management

1.3 启动RabbitMQ

[root@localhost opt]# systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service

1.4 查看进程

[root@localhost opt]# ps -ef | grep rabbitmq

1.5 测试

1. 关闭防火墙: systemctl stop firewalld

2. 浏览器输入:http://ip:15672

3. 默认账号密码:guest,guest用户默认不允许远程连接

1. 创建账号

[root@localhost opt]# rabbitmqctl add_user mzj 123456

2. 设置用户角色

[root@localhost opt]# rabbitmqctl set_user_tags mzj administrator

3. 设置用户权限

[root@localhost opt]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"

4. 查看当前用户和角色

[root@localhost opt]# rabbitmqctl list_users

5. 查看当前用户和角色

[root@localhost opt]# rabbitmqctl change_password mzj 123123

管理界面介绍

  • overview:概览
  • connections:查看链接情况
  • channels:信道(通道)情况
  • Exchanges:交换机(路由)情况,默认4类7个
  • Queues:消息队列情况
  • Admin:管理员列表
  • 端口:
    • 5672:RabbitMQ提供给编程语言客户端链接的端口
    • 15672:RabbitMQ管理界面的端口
    • 25672:RabbitMQ集群的端口

2 RabbitMQ快速入门

2.1 依赖

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
    </dependencies>

2.2.2 日志依赖log4j(可选项)

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file

2.2 创建连接

  • 先创建好虚拟主机
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/*
* 专门与RabbitMQ获得连接
* */
public class ConnectionUtil {
    public static Connection getConnection() throws Exception{
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
        factory.setHost("192.168.58.222");
        factory.setPort(5672);
        factory.setVirtualHost("/lagou");
        factory.setUsername("mzj");
        factory.setPassword("123456");
        // 通过工厂获得与MQ的连接
        Connection connection = factory.newConnection();
        return connection;
    }

    public static void main(String[] args) throws Exception{
        Connection connection = getConnection();
        System.out.println("connection = "  connection);
        connection.close();

    }
}

3 RabbitMQ模式

  • 5种消息模型,大体分为两类:

    • 1和2属于点对点
    • 3、4、5属于发布订阅模式(一对多)
  • 点对点模式:P2P(point to point)模式包含三个角色:
    • 消息队列(queue),发送者(sender),接收者(receiver)
    • 每个消息发送到一个特定的队列中,接收者从中获得消息
    • 队列中保留这些消息,直到他们被消费或超时
    • 特点:
      • 1. 每个消息只有一个消费者,一旦消费,消息就不在队列中了
      • 2. 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
      • 3. 接收者成功接收消息之后需向对象应答成功(确认)
    • 如果希望发送的每个消息都会被成功处理,那需要P2P
  • 发布订阅模式:publish(Pub)/subscribe(Sub)
    • pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者(subcriber)
    • 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
    • 特点:
      • 1. 每个消息可以有多个订阅者
      • 2. 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
      • 3. 为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。
    • 如果希望发送的消息被多个消费者处理,可采用本模式

3.1 简单模式

下面引用官网的一段介绍:

Introduction

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as

a post office: when you put the mail that you want posting in a post box, you can be sure

that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy,

RabbitMQ is a post box, a post office and a postman.

译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要

寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在

这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。

RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!

类似邮局,处理信件的应该是收件人而不是邮局!

3.1.1 生产者P

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        String msg = "小马:hello,RabbitMQ!";
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明消息队列(1,2,3,4,5)
        /*
        * 参数1:队列的名称
        * 参数2:队列中的数据是否持久化
        * 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
        * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存在保存数据)
        * 参数5:队列参数(没有参数为null)
        * */
        channel.queueDeclare("queue1",false,false,false,null);
        // 向指定的队列发送消息(1,2,3,4)
        /*
        * 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
        * 参数2:目标队列的名称
        * 参数3:设置消息的属性(没有属性则为null)
        * 参数4:消息内容(只接收字节数组)
        * */
        channel.basicPublish("","queue1",null,msg.getBytes());
        System.out.println("发送:" msg);
        // 释放资源
        channel.close();
        connection.close();
    }
}

启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认

3.1.2 消费者C

package simplest;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消息接收者
* */
public class Recer {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
            * 交付处理
            * 1、收件人信息
            * 2、信封(包裹上的快递标签)
            * 3、协议的配置
            * 4、消息
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("接收 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("queue1",true,consumer);
    }
}

启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0

3.1.3 消息确认机制ACK

  • 通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
  • RabbitMQ如何得知消息被消费者接收?
    • 如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
    • 因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
  • ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200就是告诉我们服务器执行成功
  • 整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
  • 不过这种回执ACK分为两种情况:
    • 自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
    • 手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
  • 两种情况如何选择,需要看消息的重要性:
    • 如果消息不太重要,丢失也没有影响,自动ACK会比较方便
    • 如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
  • 修改手动消息确认
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);
  • 结果如下:
package simplest;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消息接收者,加入ACK确认机制
* */
public class RecerByACK {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
            * 交付处理
            * 1、收件人信息
            * 2、信封(包裹上的快递标签)
            * 3、协议的配置
            * 4、消息
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("接收 = "  s);
                // 手动确认(收件人信息,是否同时确认多个消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列 false 手动消息确认
        channel.basicConsume("queue1",false,consumer);
    }
}

3.2 工作队列模式

  • 之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)
  • 一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
  • 多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)

3.2.1 生产者P

package work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明消息队列(1,2,3,4,5)
        /*
        * 参数1:队列的名称
        * 参数2:队列中的数据是否持久化
        * 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
        * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存在保存数据)
        * 参数5:队列参数(没有参数为null)
        * */
        channel.queueDeclare("test_work_queue",false,false,false,null);
        for (int i = 1; i < 100; i  ){
            String msg = "羊肉串-->"   i;
            channel.basicPublish("","test_work_queue",null,msg.getBytes());
            System.out.println("新鲜出炉的:" msg);
        }

        // 释放资源
        channel.close();
        connection.close();
    }
}

3.2.2 消费者1

package work;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者2
* */
public class Recer1 {
    static  int i = 1;// 统计吃掉羊肉串的数量
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        final Channel channel = connection.createChannel();
        // queueDeclare此方法有双重作用,如果队列不存在,则创建;如果队列存在,则获取
        channel.queueDeclare("test_work_queue",false,false,false,null);
        channel.basicQos(1);

        // 从信道中获得消息
        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【顾客1】吃掉了 "  s  " ! 总共【"  i    "】");
                // 模拟网络延迟
                try {
                    Thread.sleep(200);
                } catch (Exception e){
                }
                // 手动确认(收件人信息,是否同时确认多个消息)
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        // 监听队列 false 手动消息确认
        channel.basicConsume("test_work_queue",false,consumer);
    }
}

3.2.3 消费者2(同3.2.2消费者1代码相同)

只更改一处

System.out.println("【顾客2】吃掉了 "  s  " ! 总共【"  i    "】");
  • 先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
  • 虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
    • 例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可以的,应该遵循“能者多劳”
    • 效率高的多干点,效率低的少干点
    • 看下面官网是如何给出解决思路的:

公平的分配

您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。

这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者。

为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的worker。

// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos(1);
  • 能者多劳必须要配合手动的ACK机制才生效

3.2.4 面试题:避免消息堆积?

1. workqueue,多个消费者监听同一个队列

2. 接收到消息后,通过线程池,异步消费

3.3 发布订阅模式

生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知

  • 上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
  • P生产者发送信息给X路由,X将信息转发给绑定X的队列
  • X队列将信息通过信道发送给消费者,从而进行消费
  • 整个过程,必须先创建路由
    • 路由在生产者程序中创建
    • 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
    • 运行程序的顺序:
      • 1. MessageSender
      • 2. MessageReceiver1和MessageReceiver2
      • 3. MessageSender

3.3.1 生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();

        // 声明路由
        // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
        channel.exchangeDeclare("test_exchange_fanout","fanout");
        //channel.queueDeclare("test_work_queue",false,false,false,null);
        String msg = "Hello,everyone";
        channel.basicPublish("test_exchange_fanout","",null,msg.getBytes());
        System.out.println("生产者:" msg);
        // 释放资源
        channel.close();
        connection.close();
    }
}

3.3.2 消费者1

package ps;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者1
* */
public class Recer {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
        // 绑定路由(关注)
        channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout","");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者1】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_fanout_queue_1",true,consumer);
    }
}

3.3.3 消费者2

将3.3.2消费者1代码中的1修改为2即可,具体代码略

3.4 路由模式

  • 路由会根据类型进行定向分发消息给不同的队列,如图所示
  • 可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货

3.4.1 生产者

package direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();

        // 声明路由
        // direct:根据路由键进行定向分发消息
        channel.exchangeDeclare("test_exchange_direct","direct");
        //channel.queueDeclare("test_work_queue",false,false,false,null);
        String msg = "用户注册,【userid=u101】";
        channel.basicPublish("test_exchange_direct","insert",null,msg.getBytes());
        System.out.println("用户系统:" msg);
        // 释放资源
        channel.close();
        connection.close();
    }
}

3.4.2 消费者1

package direct;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者1
* */
public class Recer1 {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);
        // 绑定路由(如果路由键的类型是添加、删除、修改的话,绑定到这个队列上)
        channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert");
        channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update");
        channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者1】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_direct_queue_1",true,consumer);
    }
}

3.4.3 消费者2

package direct;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者2
* */
public class Recer2 {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);
        // 绑定路由(如果路由键的类型是查询的话,绑定到这个队列2上)
        channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者2】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_direct_queue_2",true,consumer);
    }
}

1. 记住运行程序的顺序,先运行一次sender(创建路由器),

2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定

3. 再次运行sender,发出消息

3.5 通配符模式

  • 和路由模式90%是一样的。
  • 唯独的区别就是路由键支持模糊匹配
  • 匹配符号
    • *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
    • #:匹配0个或更多个词
  • 看一下官网案例:
    • Q1绑定了路由键 *.orange.* Q2绑定了路由键 *.*.rabbit 和 lazy.#
    • 下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit  # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox     # Q1
lazy.brown.fox       # Q2
lazy.pink.rabbit     # Q2
quick.brown.fox      # 无
orange               # 无
quick.orange.male.rabbit # 无

3.5.1 生产者

package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();

        // 声明路由(路由名,路由类型,持久化)
        // topic:模糊匹配的定向分发
        channel.exchangeDeclare("test_exchange_topic","topic");
        String msg = "商品降价";
        channel.basicPublish("test_exchange_topic","product.price", null,msg.getBytes());
        System.out.println("商品系统:" msg);
        // 释放资源
        channel.close();
        connection.close();
    }
}

3.5.2 消费者1

package topic;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者1
* */
public class Recer1 {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列(第二个参数为true:支持持久化)
        channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null);
        // 绑定路由(绑定用户相关的消息)
        channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者1】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
    }
}

3.5.3 消费者2

package topic;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者2
* */
public class Recer2 {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null);
        // 绑定路由(绑定用户相关的消息)
        channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#");
        channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者2】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_topic_queue_2",true,consumer);
    }
}

4 持久化

  • 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?

    • 消费者的ACK确认机制,可以防止消费者丢失消息
    • 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
  • 想要将消息持久化,那么路由和队列都要持久化才可以

4.1 生产者

package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import util.ConnectionUtil;

/*
* 消息生产者
* */
public class Sender {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();

        // 声明路由(路由名,路由类型,持久化)
        // topic:模糊匹配的定向分发
        channel.exchangeDeclare("test_exchange_topic","topic",true);
        String msg = "商品降价";
        channel.basicPublish("test_exchange_topic","product.price", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
        System.out.println("商品系统:" msg);
        // 释放资源
        channel.close();
        connection.close();
    }
}

4.2 消费者

package topic;

import com.rabbitmq.client.*;
import util.ConnectionUtil;

import java.io.IOException;

/*
* 消费者
* */
public class Recer1 {
    public static void main(String[] args) throws Exception {
        // 获得连接
        Connection connection = ConnectionUtil.getConnection();
        // 在连接中创建通道(信道)
        Channel channel = connection.createChannel();
        // 声明队列(第二个参数为true:支持持久化)
        channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
        // 绑定路由(绑定用户相关的消息)
        channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");
        // 从信道中获得消息
        DefaultConsumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是从队列中获取的消息
                String s = new String(body);
                System.out.println("【消费者1】 = "  s);
            }
        };
        // 监听队列 true 自动消息确认
        channel.basicConsume("test_exchange_topic_queue_1",true,consumer);
    }
}

来源:https://www.icode9.com/content-1-869451.html

(0)

相关推荐

  • 还不知道异步队列?点进来看!

    你不知道的分布式异步队列 关于异步队列你解多少? 它被誉为大数据高并发的终极解决方案. 来先给你介绍几种: RabbitQM:是爱立信的产品,基于erlang语言(函数式编程大数据 scala语言) ...

  • (1条消息) RabbitMQ六种通信模式介绍

    文章目录 一.简介 二.代码实现 三.程序运行效果 四.模式总结 一.简介 本篇博客所讲的为RabbitMQ六种通信模式之一的发布与订阅模式,官网给出的图如下所示: 简单模式与工作模式2个案例中,只有 ...

  • C#队列学习笔记:RabbitMQ安装及使用

    一.环境搭建 1.1.由于RabbitMQ是使用Erlang语言开发的,因此要安装Erlang运行时环境,下载地址:Erlang官网下载  CSDN分享下载 1.2.去RabbitMQ官网下载Rabb ...

  • 3W字带你迅速上手MQ

    3W字带你迅速上手MQ

  • 万字长文书写RabbitMQ最全见解!以后再也不用到处去搜索了呀!

    典型应用场景 1.跨系统的异步通信 人民银行二代支付系统,使用重量级消息队列 IBM MQ,异步,解耦,削峰都有体现. 2.应用内的同步变成异步 秒杀:自己发送给自己 3.基于Pub/Sub模型实现的 ...

  • C#队列学习笔记:RabbitMQ延迟队列

    一.引言 日常生活中,很多的APP都有延迟队列的影子.比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等.假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效.假 ...

  • JAVA多线程学习笔记整理

    多线程: 三种创建方法 继承Thread类,以线程运行内容重写run方法,创建Thread对象并用start方法启动该线程. (匿名内部类) (Lambda表达式) 实现Runable接口,以线程运行 ...

  • 学习记录笔记的方法

    ​很多人都会觉得记笔记浪费时间,也没有养成系统归纳的习惯,但其实如果能总结出一套适合自己的方法,不仅不会耽误时间,反而会事半功倍. 五个必做笔记的理由 ① 做笔记本身可以让你直接知道,你究竟有没有收获 ...

  • Response ->(个人学习记录笔记)

    @ 目录 1. HTTP协议: 1.1 请求消息:客户端发送给服务器端的数据 1.2 响应消息:服务器端发送给客户端的数据 1.2.1 数据格式: 1.2.1.1 响应行 1.2.1.2 响应头: 1 ...

  • java开发学习路线~

    既然想做Java开发工程师,那第一步必不可少的就是Java语言 一.编程基础 1.java语言: java基本语法,面向对象,接口,容器,异常,泛型,注解,反射,I/O, jvm java高级: 并发 ...

  • 学习记录-1

    没有执行力,哪有竞争力! 2.只要心存梦想,坚持努力去奋斗,最终没有什么是不能实现: 3.遇事不能慌乱,要沉着泠静,才能做出正确的判断,才不会错失良机,给自己留下无法弥         补的遗憾: 4 ...

  • java学习——58.java.swing学习

    AWT组件不是跨平台的,从外观到控制都依赖本地操作系统,所以称为重型组件.它会造成在不同的操作系统中会有不同的外观显现,所以现在均推荐使用Swing组件. Swing组件扩展了AWT,AWT中有的组件 ...

  • git原理学习记录:从基本指令到背后原理,实现一个简单的git

    一开始我还担心 git 的原理会不会很难懂,但在阅读了官方文档后我发现其实并不难懂,似乎可以动手实现一个简单的 git,于是就有了下面这篇学习记录. 本文的叙述思路参照了官方文档Book的原理介绍部分 ...

  • Java Web学习笔记(一)

    数据库相关(关于数据库连接的方法应该定义为静态方法): 加载驱动: static { // 加载驱动 try { Class.forName("com.mysql.cj.jdbc.Drive ...