限流器是微服务中必不缺少的一环,可以起到保护下游服务,防止服务过载等作用。上一篇文章《Golang限流器time/rate使用介绍》简单介绍了time/rate的使用方法,本文则着重分析下其实现原理。建议在正式阅读本文之前,先阅读下上一篇文章。
上一篇文章讲到,time/rate是基于Token Bucket(令牌桶)算法实现的限流。本文将会基于源码,深入剖析下Golang是如何实现Token Bucket的。其代码也非常简洁,去除注释后,也就200行左右的代码量。
同时,我也提供了time/rate注释版,辅助大家理解该组件的实现。
背景
简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。 而用户则从桶中取Token,如果有剩余Token就可以一直取。如果没有剩余Token,则需要等到系统中被放置了Token才行。
一般介绍Token Bucket的时候,都会有一张这样的原理图:
从这个图中看起来,似乎令牌桶实现应该是这样的:
有一个Timer和一个BlockingQueue。Timer固定的往BlockingQueue中放token。用户则从BlockingQueue中取数据。
这固然是Token Bucket的一种实现方式,这么做也非常直观,但是效率太低了:我们需要不仅多维护一个Timer和BlockingQueue,而且还耗费了一些不必要的内存。
在Golang的timer/rate
中的实现, 并没有单独维护一个Timer,而是采用了lazyload的方式,直到每次消费之前才根据时间差更新Token数目,而且也不是用BlockingQueue来存放Token,而是仅仅通过计数的方式。
Token的生成和消费
我们在上一篇文章中讲到,Token的消费方式有三种。但其实在内部实现,最终三种消费方式都调用了reserveN函数来生成和消费Token。
我们看下reserveN函数的具体实现,整个过程非常简单。在正式讲之前,我们先了解一个简单的概念:
在`time/rate`中,`NewLimiter`的第一个参数是速率limit,代表了一秒钟可以产生多少Token。
那么简单换算一下,我们就可以知道一个Token的生成间隔是多少。
有了这个生成间隔,我们就可以轻易地得到两个数据:
1. 生成N个新的Token一共需要多久。time/rate中对应的实现函数 durationFromTokens。
2. 给定一段时长,这段时间一共可以生成多少个Token。time/rate中对应的实现函数为tokensFromDuration。
那么,有了这些转换函数,整个过程就很清晰了,如下:
1. 计算从上次取Token的时间到当前时刻,期间一共新产生了多少Token:
我们只在取Token之前生成新的Token,也就意味着每次取Token的间隔,实际上也是生成Token的间隔。我们可以利用tokensFromDuration, 轻易的算出这段时间一共产生Token的数目。
那么,当前Token数目 = 新产生的Token数目 + 之前剩余的Token数目 – 要消费的Token数目。
2. 如果消费后剩余Token数目大于零,说明此时Token桶内仍不为空,此时Token充足,无需调用侧等待。
如果Token数目小于零,则需等待一段时间。
那么这个时候,我们可以利用durationFromTokens将当前负值的Token数转化为需要等待的时间。
3. 将需要等待的时间等相关结果返回给调用方。
从上面可以看出,其实整个过程就是利用了Token数可以和时间相互转化的原理。 而如果Token数为负,则需要等待相关时间即可。
注意:如果当消费时,Token桶中的Token数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。 从结果来看,这个行为跟用Timer+BlockQueue实现是一样的。
此外,整个过程为了保证线程安全,更新令牌桶相关数据时都用了mutex加锁。
我们模拟下请求与Token数变化的关系:
1. 当某一时间,桶内Token数为3, 此时A线程请求5个Token。那么此时桶内Token不足,因此A线程需要等待2个Token的时间。且此时桶内Token数变为-2。
2. 同时,B线程请求4个Token,此时桶内Token数为-2,因此B线程需要等待2+4=6个Token的时间,且此时桶内Token数变为-6。
对于Allow函数实现时,只要判断需要等待的时间是否为0即可,如果大于0说明需要等待,则返回False,反之返回True。
对于Wait函数,直接t := time.NewTimer(delay)
,等待对应的时间即可。
float精度问题
从上面原理讲述可以看出,在Token和时间的相互转化函数durationFromTokens
和tokensFromDuration
中,涉及到float64的乘除运算。 一谈到float的乘除,我们就需要小心精度问题了。
而Golang在这里也踩了坑,以下是tokensFromDuration
最初的实现版本
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
这个操作看起来一点问题都没:每秒生成的Token数乘于秒数。 然而,这里的问题在于,d.Seconds()
已经是小数了。两个小数相乘,会带来精度的损失。
所以就有了这个issue:golang.org/issues/34861。
修改后新的版本如下:
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
time.Duration
是int64
的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。
数值溢出问题
我们讲reserveN函数的具体实现时,第一步就是计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token,同时也可得出当前的Token是多少。
我最开始的理解是,直接可以这么做:
// elapsed表示过去的时间差
elapsed := now.Sub(lim.last)
// delta表示这段时间一共新产生了多少Token
delta = tokensFromDuration(now.Sub(lim.last))
tokens := lim.tokens + delta
if(token > lim.burst){
token = lim.burst
}
其中,lim.tokens
是当前剩余的Token,lim.last
是上次取token的时刻。lim.burst
是Token桶的大小。 使用tokensFromDuration计算出新生成了多少Token,累加起来后,不能超过桶的容量即可。
这么做看起来也没什么问题,然而并不是这样。
在time/rate
里面是这么做的,如下代码所示:
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
与我们最开始的代码不一样的是,它没有直接用now.Sub(lim.last)
来转化为对应的Token数,而是 先用lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
,计算把桶填满的时间maxElapsed。 取elapsed和maxElapsed的最小值。
这么做算出的结果肯定是正确的,但是这么做相比于我们的做法,好处在哪里?
对于我们的代码,当last非常小的时候(或者当其为初始值0的时候),此时now.Sub(lim.last)
的值就会非常大,如果lim.limit
即每秒生成的Token数目也非常大时,直接将二者进行乘法运算,结果有可能会溢出。
因此,time/rate
先计算了把桶填满的时间,将其作为时间差值的上限,这样就规避了溢出的问题。
Token的归还
而对于Reserve函数,返回的结果中,我们可以通过Reservation.Delay()
函数,得到需要等待时间。 同时调用方可以根据返回条件和现有情况,可以调用Reservation.Cancel()
函数,取消此次消费。 当调用Cancel()
函数时,消费的Token数将会尽可能归还给Token桶。
此外,我们在上一篇文章中讲到,Wait函数可以通过Context进行取消或者超时等, 当通过Context进行取消或超时时,此时消费的Token数也会归还给Token桶。
然而,归还Token的时候,并不是简单的将Token数直接累加到现有Token桶的数目上,这里还有一些注意点:
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
以上代码就是计算需要归还多少的Token。其中: 1. r.tokens
指的是本次消费的Token数 2. r.timeToAct
指的是Token桶可以满足本次消费数目的时刻,也就是消费的时刻+等待的时长。 3. r.lim.lastEvent
指的是最近一次消费的timeToAct值
其中:r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
指的是,从该次消费到当前时间,一共又新消费了多少Token数目。
根据代码来看,要归还的Token要是该次消费的Token减去新消费的Token。 不过这里我还没有想明白,为什么归还的时候,要减去新消费数目。
按照我的理解,直接归还全部Token数目,这样对于下一次消费是无感知影响的。这块的具体原因还需要进一步探索。
总结
Token Bucket其实非常适合互联网突发式请求的场景,其请求Token时并不是严格的限制为固定的速率,而是中间有一个桶作为缓冲。 只要桶中还有Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。
此外在维基百科中,也提到了分层Token Bucket(HTB)作为传统Token Bucket的进一步优化,Linux内核中也用它进行流量控制。