消息驱动(Stream)
文章目录
- 1、概述
- ①、为什么要用Stream
- ②、什么是Stream
- ③、设计思想
- Ⅰ、传统MQ
- Ⅱ、Cloud Stream
- ④、流程
- 2、案例说明
- ①、消息驱动之生产者
- Ⅰ、建Module
- Ⅱ、POM
- Ⅲ、YML
- Ⅳ、主启动
- Ⅴ、业务类
- Ⅵ、启动测试
- ②、消息驱动之消费者
- Ⅰ、建Module
- Ⅱ、POM
- Ⅲ、YML
- Ⅳ、主启动
- Ⅴ、业务类
- Ⅵ、测试
- 3、分组消费
- ①、问题提出
- ②、分组解决
- Ⅰ、原理
- Ⅱ、配置进同一个组
- Ⅲ、再次测试
- 4、持久化
1、概述
①、为什么要用Stream
对于消息中间件
MQ
,大家所熟知的就有四个,ActiveMQ
,RabbitMQ
,阿里的RocketMQ
,和大数据平台的Kafka
。而现在的Web
系统,我们可以把它看作以下三部分
如果我们Java程序员用到的消息中间件是
RabbitMQ
,但是大数据平台使用的是Kafka
,那这个系统将会至少存在两个问题
- 切换
- 开发
- 维护
所以我们就开始思考,有没有一种技术让,让我们不再关注
MQ
的细节,,我们只需要用一种适配绑定的方式,自动的帮我们在各种MQ
之间切换,正所谓偷懒才是第一生产力,无论底层多少的MQ
,我们统一使用Stream来操作各种各样的MQ
一句话就是:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
②、什么是Stream
官方定义
Spring Cloud Stream
是一个用于构建与共享消息系统相连接的高可伸缩性事件驱动的微服务。
应用程序通过
inputs
发送者或者outputs
消费者通过配置binding
(绑定来与Stream
中binder
对象交互,),而Stream
的binder
对象负责与消息中间件交互,所以我们只需要搞清楚如何与Spring Cloud Stream
交互就可以方便使用消息驱动的方式。
目前Spring Cloud Stream
仅支持RabbitMQ
和Kafka
,如果要使用如RocketMQ
,建议使用nacos
③、设计思想
Ⅰ、传统MQ
生产者消费者通过
Message
媒介传递消息内容消息必须走特定的消息通道
MessageChannel
消息通道
MessageChannel
的子接口SubscribableChannel
,由MessageHandler
消息处理器所订阅获取消息
Ⅱ、Cloud Stream
中间件的差异性给我们实际项目开发造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候
Spring Cloud Stream
给我们提供了一种解耦合的方式。
在没有绑定器这个概念的情况下,我们的
SpringBoot
应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel
通道,使得应用程序不需要再考虑各种不同的消息中间件实现。Stream
对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq
切换为kafka
),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程
Stream中的消息通信方式遵循了发布、订阅模式
④、流程
- Binder:很方便的连接中间件,屏蔽差异,是应用与消息中间件之间的封装,目前实现了
Kafka
和RalBinder
可以很方便的连接中间件,可以动态的改变消息类型(对应RabbitMQ
的exchange
),这些都可以通过配置文件来实现 - Channel:通道,是队列
Queue
的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel
对队列进行配置 - Source和Sink:简单的可理解为参照对象是
Spring Cloud Stream
自身,从Stream
发布消息就是输出,接受消息就是输入
四个注解
注解 | 解释 |
---|---|
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel 和exchange 绑定在一起 |
2、案例说明
①、消息驱动之生产者
Ⅰ、建Module
Ⅱ、POM
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>SpringCloudDemo</artifactId> <groupId>com.phz.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>StreamRabbitMQProvider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies></project>
Ⅲ、YML
server: port: 8801spring: application: name: stream-provider-service cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关环境配置 spring: rabbitmq: host: 39.105.43.3 port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S) lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s instance-id: send-8801.com #在信息列表时显示主机名称 prefer-ip-address: true #访问的路径变为IP地址
Ⅳ、主启动
/** * @author PengHuAnZhi * @createTime 2021/2/18 16:30 * @projectName SpringCloudDemo * @className StreamMQMain8801.java * @description TODO */@SpringBootApplicationpublic class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); }}
Ⅴ、业务类
定义IMessageProvider消息发送接口
/** * @author PengHuAnZhi * @createTime 2021/2/18 16:31 * @projectName SpringCloudDemo * @className IMessageProvider.java * @description TODO */public interface IMessageProvider { String send();}
实现IMessageProvider接口
/** * @author PengHuAnZhi * @createTime 2021/2/18 16:32 * @projectName SpringCloudDemo * @className IMessageProviderImpl.java * @description TODO *///这里不再加Service注解,因为它是用来和Stream打交道的Service。@EnableBinding(Source.class)//定义消息的推送管道public class IMessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output;//消息发送管道,名称必须为output,否则无法启动 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("serial : " serial); return null; }}
定义
Controller
/** * @author PengHuAnZhi * @createTime 2021/2/18 16:40 * @projectName SpringCloudDemo * @className SendMessageController.java * @description TODO */@RestControllerpublic class SendMessageController { @Resource private IMessageProvider iMessageProvider; @RequestMapping(value = "/sendMessage") public String sendMessage(){ return iMessageProvider.send(); }}
Ⅵ、启动测试
RabbitMQ
管理页面发现新的Topic
多点几下
②、消息驱动之消费者
Ⅰ、建Module
建两个相同的消费者
8802
和8803
Ⅱ、POM
两个Module的POM文件相同
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>SpringCloudDemo</artifactId> <groupId>com.phz.springcloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>StreamRabbitMQConsumer8802</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--监控--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--eureka client--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--stream rabbit --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies></project>
Ⅲ、YML
注意
8802
和8803
配置端口号
server: port: 8803spring: application: name: cloud-stream-consumer cloud: stream: binders: #在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: #设置rabbitmq的相关环境配置 spring: rabbitmq: host: 39.105.43.3 port: 5672 username: guest password: guest bindings: #服务的整合处理 input: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的Exchange名称定义 content-type: application/json #设置消息类型,本次为json,本文要设置为“text/plain” binder: defaultRabbit #设置要绑定的消息服务的具体设置eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30S) lease-expiration-duration-in-seconds: 5 #如果超过5S间隔就注销节点 默认是90s instance-id: receive-8803.com #在信息列表时显示主机名称 prefer-ip-address: true #访问的路径变为IP地址
Ⅳ、主启动
两个消费者一样
/** * @author PengHuAnZhi * @createTime 2021/2/18 17:16 * @projectName SpringCloudDemo * @className StreamMQMain8803.java * @description TODO */@SpringBootApplicationpublic class StreamMQMain8803 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8803.class, args); }}
Ⅴ、业务类
两个消费者一样
/** * @author PengHuAnZhi * @createTime 2021/2/18 17:18 * @projectName SpringCloudDemo * @className ReceiveMessageController.java * @description TODO */@Component@EnableBinding(Sink.class)public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String>message){ System.out.println("消费者二号 : " message.getPayload() "\tServerPort : " serverPort); }}
Ⅵ、测试
连续发送几个消息
观察控制台
3、分组消费
①、问题提出
在如下场景中,订单系统我们做集群部署,都会从
RabbitMQ
中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误
我们得避免这种情况,这时我们就可以使用
Stream
中的消息分组来解决,注意在Stream
中处于同一个group
中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次,不同组是会全面消费的(重复消费),
观察RabbitMQ管理页面,能看到两个分组
#假设8802和8803是这样对应的Queue studyExchange.anonymous.1WY9VhKRRnSvJqlSS6yfHg #8802Queue studyExchange.anonymous.QFXjSsgNQAabd2U0QZeHzw #8803
由于组流水号不同,被视为两个组,那么根据刚刚的结论,这两个相同的服务处于不同的组,那就会产生重复消费
②、分组解决
Ⅰ、原理
微服务应用放置于同一个
group
中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
Ⅱ、配置进同一个组
添加一行配置
group: PengHuAnZhi
Ⅲ、再次测试
实际上
RabbitMQ
里面没有group
的概念,实际是exchange binding
了多个queues
,通过routing key
路由道不同的queue
,不同消费者监听不同的queue
4、持久化
其实我们加上了group属性,就自动支持了持久化,如下演示
现在我们停止
8802
和8803
,然后去掉8802
的group
配置,然后我们发送五条消息
重新启动
8802
(去掉了group
),观察控制台,发现并没有错过的消息接收
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hqByq0JI-1613643711475)(https://gitee.com/peng_huang_zhi/personal-drawing-bed/raw/master/image-20210218182114362.png)]
重新启动
8803
(没有去掉group
),观察控制台,发现错过的消息及时接收到了,这就是消息的持久化