token bucket令牌桶限流算法原理及代码【图文】
1 概述
限流算法主要有如下几种:
- 基于信号量Semaphore
只有数量维度,没有时间维度 - 基于fixed window
带上了时间维度,不过在两个窗口的临界点容易出现超出限流的情况,比如限制每分钟10个请求,在00:59请求了10次,在01:01又请求了10次,而从00:30-01:30这个时间窗口来看,这一分钟请求了20次,没有控制好 - 基于rolling window
就是要解决fixed window没解决的窗口临界问题,主要有基于token bucket的算法,以及基于leaky bucket的算法 - token bucket算法
token按指定速率添加到bucket中
一个bucket有其容量限制,超过其容量则多余的token会被丢弃
当请求到来时,先试图获取token,如果剩余token足够则放行,不够则不允许放行(可能等待token足够再继续)
2 简单实现
2.1 Java版
/**
* The minimalistic token-bucket implementation
*/
public class MinimalisticTokenBucket {
private final long capacity;
private final double refillTokensPerOneMillis;
private double availableTokens;
private long lastRefillTimestamp;
/**
* Creates token-bucket with specified capacity and refill rate equals to refillTokens/refillPeriodMillis
*/
public MinimalisticTokenBucket(long capacity, long refillTokens, long refillPeriodMillis) {
this.capacity = capacity;
this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
synchronized public boolean tryConsume(int numberTokens) {
refill();
if (availableTokens < numberTokens) {
return false;
} else {
availableTokens -= numberTokens;
return true;
}
}
private void refill() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > lastRefillTimestamp) {
long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp;
double refill = millisSinceLastRefill * refillTokensPerOneMillis;
this.availableTokens = Math.min(capacity, availableTokens + refill);
this.lastRefillTimestamp = currentTimeMillis;
}
}
private static final class Selftest {
public static void main(String[] args) {
// 100 tokens per 1 second
MinimalisticTokenBucket limiter = new MinimalisticTokenBucket(100, 100, 1000);
long startMillis = System.currentTimeMillis();
long consumed = 0;
while (System.currentTimeMillis() - startMillis < 10000) {
if (limiter.tryConsume(1)) {
consumed++;
}
}
System.out.println(consumed);
}
}
}
以上是bucket4j给出的一个简单实现,用于理解token bucket算法
这个算法没有采用线程去refill token,因为bucket太多的话,线程太多,耗cpu
这个算法没有存储每个period使用的token,设计了lastRefillTimestamp字段,用于计算需要填充的token
每次tryConsume的时候,方法内部首先调用refill,根据设定的速度以及时间差计算这个时间段需要补充的token,更新availableTokens以及lastRefillTimestamp
之后限流判断,就是判断availableTokens与请求的numberTokens
2.2 Golang
package main
import (
"log"
)
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
// go 无原生构造函数,必须手动定义并实现
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter {
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
func (cl *ConnLimiter) ReleaseConn() {
c :=<- cl.bucket // 释放写进去的token
log.Printf("New connction coming: %d", c)
}
高性能限流器Guava RateLimiter
令牌桶算法,其核心是想通过限流器,必须拿到令牌。
只要我们能够限制发放令牌的速率,那么就能控制流速:
- 令牌以固定速率添加到令牌桶中,假设限流速率是 r/秒,则令牌每 1/r 秒会添加一个
- 假设令牌桶的容量是 b ,如果令牌桶已满,则新的令牌会被丢弃
- 请求能够通过限流器的前提是令牌桶中有令牌
b 其实是burst的简写,意义是限流器允许的最大突发流量。比如b=10,而且令牌桶中的令牌已满,此时限流器允许10个请求同时通过限流器,这只是突发流量,这10个请求会带走10个令牌,所以后续流量只能按照速率 r 通过限流器。
如何实现呢?基于生产者-消费者模式?
- 一个生产者线程定时向阻塞队列添加令牌
- 试图通过限流器的线程则作为消费者线程
- 只有从阻塞队列中获取到令牌,才允许通过限流器
设计看上去很完美,实现也简单,若并发量不大,这没有什么问题。可使用限流大部分都是高并发场景,而且系统压力已经临近极限了,此时这个实现就有问题了。
问题出在定时器,高并发下,系统压力已临近极限,定时器精度误差会很大,定时器本身还会创建调度线程,对系统性能影响极大。
所以Guava没有使用定时器,它是如何实现的呢?
Guava的令牌桶算法
关键是记录并动态计算下一令牌的发放时间。
假设令牌桶的容量为 b=1,限流速率 r = 1个请求/s。如下所示,若当前令牌桶无令牌,下一个令牌的发放时间是在第3s,而在第2s时,有个线程T1请求令牌,此时该如何处理?
- 线程T1请求令牌
对于该请求令牌的线程,很显然需要等待1s,1s以后(第3s)它就能拿到令牌。下一个令牌发放的时间也要增加1秒,因为第3s发放的令牌已被线程T1预占。
- 线程T1请求结束
假设T1在预占第3s令牌后,马上又有一个线程T2请求令牌
- 线程T2请求令牌
由于下一个令牌产生的时间是第4s,所以线程T2要等待2s,才能获取到令牌,同时由于T2预占第4s令牌,所以下一令牌产生时间还要增加1s
- 线程T2请求结束
线程T1、T2都是在下一令牌产生时间之前请求令牌,若线程在下一令牌产生时间之后请求令牌会咋样?
假设在线程T1请求令牌之后的5秒,即第7秒,线程T3请求令牌,如下图所示。 - 线程T3请求令牌
由于第5s已产生一个令牌,所以此时线程T3可直接拿到令牌,无需等待。
在第7s,实际上限流器能产生3个令牌,第5、6、7秒各产生一个令牌。由于我们假设令牌桶的容量是1,所以第6、7秒产生的令牌就丢弃了,其实等价地你也可以认为是保留的第7秒的令牌,丢弃的第5、6秒的令牌,也就是说第7秒的令牌被线程T3占有了,于是下一令牌的的产生时间应该是第8秒 - 线程T3请求结束
所以我们只需要记录一个下一令牌产生的时间,并动态更新它。
依然假设令牌桶的容量是1。关键是reserve()方法,这个方法会为请求令牌的线程预分配令牌,同时返回该线程能够获取令牌的时间。其实现逻辑就是上面提到的:如果线程请求令牌的时间在下一令牌产生时间之后,那么该线程立刻就能够获取令牌;反之,如果请求时间在下一令牌产生时间之前,那么该线程是在下一令牌产生的时间获取令牌。由于此时下一令牌已经被该线程预占,所以下一令牌产生的时间需要加上1秒。
3 小结
token bucket算法,是基于QPS来限流,其简单的实现,就是计算单位时间补充token的速率,然后每次tryConsume的时候根据速率修正availableTokens。
参考
- https://github.com/vladimir-bukhtoyarov/bucket4j/blob/master/doc-pages/token-bucket-brief-overview.md
- https://en.wikipedia.org/wiki/Token_bucket