SpringBoot整合RabbitMQ 手动应答 简单demo

版本说明

  • JDK 1.8
  • RabbitMQ 3.7.15 Erlang 22.0
  • SpringBoot 2.3.3.RELEASE

// Update 2021年1月19日16:50:16 CentOS搭建RabbitMQ 完整可操作
https://www.cnblogs.com/fengyumeng/p/11133924.html

1. 在RabbitMQ的Web管理界面,创建test队列

参数的含义
durability:是否持久化(重启或宕机后消息依然保存)

  • durable 持久
  • transient 暂时

新建maven项目。

2. pom.xml

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.3.3.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>com.demo</groupId>    <artifactId>rabbitmq-demo</artifactId>    <version>1.0.0</version>    <properties>        <lombok.version>1.18.12</lombok.version>    </properties>    <dependencies>        <!--web 模块-->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency><!-- AMQP -->        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>        <!--  lombok-->        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <optional>true</optional>            <scope>provided</scope>            <version>${lombok.version}</version>        </dependency>    </dependencies>    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

3. application.yaml

server:  port: 20002spring:  rabbitmq:  # 这里我改了本地的hosts,实际地址是192.168.0.121    host: vm.com    port: 5672    virtual-host: /    username: admin    password: admin    # 开启消息确认模式    # 消息发送到交换机确认机制,是否确认回调    # publisher-confirms: true    # 是否返回回调    publisher-returns: true    template:      #开启mandatory: true, basic.return方法将消息返还给生产者      mandatory: true    listener:      simple:      # 手动应答        acknowledge-mode: manual        # 最少消费者数量        concurrency: 1         # 最多消费者数量        max-concurrency: 10        # 支持重试        retry:          enabled: true 

端口

  • 5672:RabbitMQ的通信端口

  • 15672:Web管理界面端口

4. RabbitmqDemo.java

@SpringBootApplication@EnableRabbitpublic class RabbitmqDemoApplication {        public static void main(String[] args) {        SpringApplication.run(RabbitmqDemoApplication.class, args);    }}

5. RabbitConfig.java

@Configuration@Slf4jpublic class RabbitConfig {    private RabbitTemplate rabbitTemplate;    @Bean    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {        rabbitTemplate = new RabbitTemplate(connectionFactory);        return rabbitTemplate;    }}

配置RabbitMQ的消息模板。

6. 消息生产者 produce.java

@Componentpublic class Producer {        // @Qualifier("rabbitTemplate")    @Autowired    private RabbitTemplate rabbitTemplate;    public void send() {        for (int i = 0; i < 5; i++) {            System.out.println("生产者发送消息,序号为: " + i);            rabbitTemplate.convertAndSend("test", String.valueOf(i));        }    }}

初始化消息发送模板RabbitTemplate,@Qualifier注解用于限定具体的实现类,这里可以不指定。

7. 消息消费者 consumer.java

消费者1和消费者2均监听test队列。

不同的是,消费者1收到消息后返回确认应答basicAck。

而消费者2收到消息后返回拒绝应答basicRegect,消息被消费者拒绝后重新回到test队列中,等待下次发送给消费者。

@Component@Slf4jpublic class Consumer {    /**     * 消费者1 模拟正常处理消息的情况,消息处理完毕发送确认应答     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "test")    public void process1(Message message, Channel channel) throws IOException {        log.info("消费者1 接收消息: " + new String(message.getBody()));        log.info("消费者1 确认应答消息:" + new String(message.getBody()));        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);    }    /**     * 消费者2 模拟处理消息出错的情况,消费者2向rabbitmq发送拒绝应答。     * 处理失败的消息会被重新放入ready中,再次发送给消费者,直至收到确认应答     * @param message     * @param channel     * @throws IOException     */    @RabbitListener(queues = "test")    public void process2(Message message, Channel channel) throws IOException {        log.info("消费者2 接收消息:" + new String(message.getBody()));        log.info("消费者2 拒绝应答消息:" + new String(message.getBody()));        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);    }}

8. 测试RabbitMqController.java

@RestController@RequestMapping("")public class RabbitMqController {    @Autowired    private Producer producer;    @GetMapping("/send")    public String send() {        producer.send();        return "发送完成";    }}

9. 测试

使用postman或浏览器使用Get方法请求http://localhost:20001/send,生产者会向RabbitMQ的test队列发送5条消息:

生产者发送消息,序号为: 0生产者发送消息,序号为: 1生产者发送消息,序号为: 2生产者发送消息,序号为: 3生产者发送消息,序号为: 4

可以看出序号为2的消息3次被消费者2接收,消费者2也3次发送拒绝应答,直到第4次才被消费者1接收,并返回确认应答。

END

// 2021年1月12日 更新Demo
Demo from Gitee

(0)

相关推荐

  • JMS--ActiveMQ的简单使用

    推荐阅读: 1.SSM整合及聚合工程的搭建 2.AdminLTE介绍和zTree的简单使用 3.MyBatis分页组件--PageHelper 4.Shiro 5.SpringAop--系统日志简例 ...

  • centos7 下部署springboot项目

    centos7 下部署springboot项目

  • springboot如何实现热部署

    第一步: 1)pom中加依赖 <dependencies> <dependency> <groupId>org.springframework.boot</g ...

  • jackson学习之九:springboot整合(配置文件)

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 系列文章汇总 jackson学习之一:基本信息 jac ...

  • 手把手教你SpringBoot整合MybatisPlus 代码生成器

    一.在pom.xml中添加所需依赖 <!-- MyBatis-Plus代码生成器--><dependency> <groupId>com.baomidou</ ...

  • quartz与springboot整合无法暂停job问题

    很多项目中都会用到定时任务的场景.起初项目中只是简单的使用了spring提供的@Scheduled注解.随着定时任务越来越多,我们需要对定时任务进行可视化管理,于是就单独建立了一个工程,用quartz ...

  • SpringBoot整合Shiro权限框架实战

    什么是ACL和RBAC ACL Access Control list:访问控制列表 优点:简单易用,开发便捷 缺点:用户和权限直接挂钩,导致在授予时的复杂性,比较分散,不便于管理 例子:常见的文件系 ...

  • SpringBoot整合阿里短信服务

    导读 由于最近手头上需要做个Message Gateway,涉及到:邮件(点我直达).短信.公众号(点我直达)等推送功能,网上学习下,整理下来以备以后使用. 步骤 点我直达 登录短信服务控制台 点我直 ...

  • Springboot整合百度开源分布式ID生成器UIDGenerator

    环境:sprinboot2.3.12.RELEASE uid-generator1.0.0 简介 UidGenerator是Java实现的, 基于Snowflake算法的唯一ID生成器.UidGene ...

  • SpringBoot整合Durid

    上一篇我们整合了MySQL和JDBCTemplate,并在结尾完成了双数据源的配置和插入动作.当我们将数据插入成功后,查看控制台就会发现,我们已经使用了默认的数据库连接池HikariDataSourc ...

  • SpringBoot整合Quartz作为调度中心完整实用例子

    因为想要做一个类似于调度中心的东西,定时执行一些Job(通常是一些自定义程序或者可执行的jar包),搭了一个例子,总结了前辈们的相关经验和自己的一些理解,如有雷同或不当之处,望各位大佬见谅和帮忙指正. ...

  • springboot整合mybatis

    springboot整合mybatis