rocketmq消峰之流量控制详解
rocketMq消费端消费分数以上三个步骤:
第一: 消费端从rocketMq服务端pull消息,到本地。
第二: 消费端消费pull到的消息。
第三: 消费消费结束后,回复Ack到rocketMq,偏移消费位置。
代码:
/**
* 测试mq 并发 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上10s");
//Thread.sleep(10000);
long begin = System.currentTimeMillis();
System.out.println("mq消费速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(2); //消费端拉去到消息以后分配线索去消费
defaultMQPushConsumer.setConsumeThreadMax(10);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
defaultMQPushConsumer.setPullInterval(1000);//消费端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}
第一:从上图分析可得知影响消费速度的几种情况:
1.PullInterval: 设置消费端,拉取mq消息的间隔时间。
注意:该时间算起时间是rocketMq收到Ack后算起。
例如:PullInterval=10s,上次rocketMq服务收到Ack消息后开始算起10s后,在拉去消息,不包含消费端消费占用的时间。
2.PullBatchSize: 设置每次pull消息的数量,该参数设置是针对逻辑消息队列,并不是每次pull消息拉到的总消息数,若上图右边消费端分配了两个消费队列来监听。那么PullBatchSize 设置为32,那么该消费端每次pull到 64个消息。
消费端每次pull到消息总数=PullBatchSize*监听队列数
3.ThreadMin和ThreadMax: 消费端消费pull到的消息需要的线程数量。
以上三种情况:是针对参数配置,来调整消费速度。
除了这三种情况外还有两种服务部署情况,可以调整消费速度:
4.rocketMq 逻辑消费队列配置数量 有消费端每次pull到消息总数=PullBatchSize*监听队列数
可知rocketMq 逻辑消费队列配置数量即上图中的 queue1 ,queue2,配置数量越多每次pull到的消息总数也就越多。如果下边配置读队列数量:
5.消费端节点部署数量 :部署数量无论一个节点监听所有队列,还是多个节点按照分配策略分配监听队列数量,理论上每秒pull到的数量都一样的,但是多节点消费端消费线程数量要比单节点消费线程数量多,也就是多节点消费速度大于单节点。
第二:从上边分析可得知,控制消费速度可以分为并发和延迟消费两种方案控制流量。
前提:rocketMq 主题topic 配置的消费队列数量一定的情况下讨论,这里设置为queueNum=4个消费队列
1.并发控流:
1.1 部署节点数量 nodeNum
1.2 每个节点消费端线程数量 threadNum
注释:一般情况最小线程和最大线程设置为一样,并且一般情况最大线程不会用到原因见线程池,触发线程次队列塞满才会启动新的线程,
1.3 每次pull的消息间隔时间 pullTime
1.4 批量拉去的数量PullBatchSize
列如上图配置: 部署节点数量为2 ,每个节点消费端线程数量为2 ,每次pull的消息间隔时间为1s,
PullBatchSize 为32
我们先分析一个节点每秒拉取消息数量:
pullTime*PullBatchSize *queueNum=TotalNum
1*32*2=64
此时具体消费速度就看消费端线程数量和接口执行时间来决定64个消息需要多久消费完。
若接口每次执行速度为50毫秒,那么1s两个线程可以执行40个消息,那么该节点处理完64个需要时间为:64/40 s 大约就是1.6s
若配置消费端开启三个线程,那么就是每秒消费64个消息时绰绰有余。
一般情况下pullTime默认为0,即消费速度完全就有消费端线程数量和接口执行速度(其他外在因素不考虑)两个因素来决定。
那么我们来分析下pullTime为0的情况:
一个接口执行时间为50ms,那么1秒一个线程处理20个消息,那么开启10个线程该节点每秒能处理200个线程,开启100个线程那就是2000个消息…
两个节点就是4000个消息这已经超出了mysql能承受的压力,其实这里达到mysql的连接不一定有4000个,毕竟mysql前 边还有一个数据库连接池在控制连接数量间接控制执行速度。
每秒处理消息数量=1s/接口执行时间毫秒*线程数量。
结论:pullTime=0 的情况下,并发控制实际就是调整节点部署数量和消费端消费线程数量,并且要预估每条消息业务处理时间(预估接口执行时间)
2.消费延时控流:
针对消息订阅者的消费延时流控的基本原理是,每次消费时在客户端增加一个延时来控制消费速度,此时理论上消费并发最快速度为:
单节点部署:
ConsumInterval :延时时间单位毫秒
ConcurrentThreadNumber:消费端线程数量
MaxRate :理论每秒处理数量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并发消费线程(ConcurrentThreadNumber)为 20,延时(ConsumInterval)为 100 ms,代入上述公式可得
200 = 1 / 0.1 * 20
由上可知,理论上可以将并发消费控制在 200 以下
如果是多个节点部署如两个节点,理论消费速度最高为每秒处理400个消息。
如下延时流控代码:
/**
* 测试mq 并发 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上0.1秒");
Thread.sleep(100);
long begin = System.currentTimeMillis();
System.out.println("mq消费速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消费:: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(20); //消费端拉去到消息以后分配线索去消费
defaultMQPushConsumer.setConsumeThreadMax(50);//最大消费线程,一般情况下,默认队列没有塞满,是不会启用新的线程的
defaultMQPushConsumer.setPullInterval(0);//消费端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消费端每个队列一次拉去多少个消息,若该消费端分赔了N个监控队列,那么消费端每次去rocketMq拉去消息说为N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}
注释:如上消费端,单节点每秒处理速度也就是最高200个消息,实际上要小于200,业务代码执行也是需要时间。
但是要注意实际操作中并发流控实际是默认存在的,
spring boot 消费端默认配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;
若业务逻辑执行需要20ms,那么单节点处理速度就是:1/0.02*20=1000
这里默认拉去的速度1s内远大于1000
注意: 这里虽然pullInterval 等于0 当时受限于每次拉去64个,处理完也是需要一端时间才能回复ack,才能再次拉取,所以消费速度应该小于1000
所以并发流控要消费速度大于消费延时流控 ,那么消费延时流控才有意义
总结:rocketMq 肖锋流控两种方式:
并发流控:就是根据业务流控速率要求,来调整topic 消费队列数量(read queue),消费端部署节点,消费端拉去间隔时间,消费端消费线程数量等,来达到要求的速率内
延时消费流控:就是在消费端延时消费消息(sleep),具体延时多少要根据业务要求速率,和消费端线程数量,和节点部署数量来控制