RateLimiter源码解析
RateLimiter是Guava包提供的限流器,采⽤了令牌桶算法,特点是均匀地向桶中添加令牌,每次消费时也必须持有令牌,否则就需要等待。应⽤场景之⼀是限制消息消费的速度,避免消息消费过快⽽对下游的数据库造成较⼤的压⼒。学历在线验证
本⽂主要介绍RateLimiter的源码,包括基本限流器SmoothBursty,以及带预热效果的SmoothWarmingUp。
RateLimiter作为限流器的顶层类,只有两个属性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUDirectly;
stopwatch⽤来计算时间间隔,以及实现了当拿不到令牌时将线程阻塞的功能;mutexDoNotUDirectly主要⽤来进⾏线程同步。
RateLimiter作为⼀个抽象类,本⾝不能直接实例化,可以使⽤静态⼯⼚⽅法来创建:
public static RateLimiter create(double permitsPerSecond); //①
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod); //②
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) //③
RateLimiter对外提供了3个构造器,分成两类,构造器①是第⼀类,底层会创建基本限流器SmoothBursty;构造器②和③是第⼆类,底层会创建带预热效果的SmoothWarmingUp。参数permitsPerSecond表⽰每秒产⽣多少个令牌,参数warmupPeriod是限流器预热阶段的时间,即限流器产⽣令牌从最慢到最快所需要的时间,参数unit是预热的时间单位。
SmoothRateLimiter新增了4个属性:
//桶中存储的令牌数
double storedPermits;
//桶中允许的最⼤令牌数
double maxPermits;
//稳定状态下产⽣令牌是速度,值为1000000/permitsPerSecond,单位是微秒/个
double stableIntervalMicros;
//下⼀次请求需要等待的时间
private long nextFreeTicketMicros = 0L;
这其中⽐较有意思的是nextFreeTicketMicros字段,它表⽰下⼀次获取令牌的请求到来时需要等待的时间,该字段可以实现上⼀次获取令牌的请求预⽀的等待时间由下⼀次请求来兑现。举例来说,如果桶中当前有10个令牌,⼀个请求来获取20个令牌,假设1秒产⽣⼀个令牌,那么本次请求预⽀了10个令
牌,可以直接返回,但是下⼀个请求就需要等待10秒的时间,这10秒就是上⼀次请求预⽀的令牌耗时。
接下来先介绍SmoothBursty的构造过程:薄唇
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, ateFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.tRate(permitsPerSecond);
return rateLimiter;
}
构造SmoothBursty时传⼊了两个参数,stopwatch好理解,第⼆个参数意思是当限流器长时间没⽤时,令牌桶内最多存储多少秒的令牌,这⾥限定了最多只存储1秒钟的令牌,也就是permitsPerSecond个。
我们继续分析tRate⽅法的实现:
public final void tRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, adMicros());
}
}
tRate⽅法先校验permitsPerSecond必须为整数,然后在同步块中执⾏doSetRate⽅法。mutex⽅法
通过双重检测的⽅式实例
化mutexDoNotUDirectly字段,详细代码略去,doSetRate是抽象⽅法,具体的实现在抽象⼦类SmoothRateLimiter中:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
//计算产⽣单个令牌的时间间隔
double stableIntervalMicros = Micros(1L) / permitsPerSecond;
//stableIntervalMicros字段赋值
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
doSetRate⽅法主要是设置了stableIntervalMicros和nextFreeTicketMicros字段,调⽤的两个⽅法resync和doSetRate我们接着分析。resync⽅法主要⽤来设置storedPermits和nextFreeTicketMicros这俩字段,代码如下:
void resync(long nowMicros) {
// nowMicros单位是微秒,初始化过程中,代码执⾏到这⾥时,nowMicros已经⼤于0, nextFreeTicketMicros取默认值0
if (nowMicros > nextFreeTicketMicros) {
//计算超过的这些时间⾥产⽣了多少新的令牌
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//重新计算当前令牌桶内持有的令牌数,此时maxPermits=0,storedPermits赋值后仍然是0
storedPermits = min(maxPermits, storedPermits + newPermits);
//更新下次获取令牌的时间为当前时间
nextFreeTicketMicros = nowMicros;
}
}
此⽅法会根据当前的时间决定是否进⾏字段赋值,如果当前时间已经超过了nextFreeTicketMicros的值,那么就重新计
算storedPermits和nextFreeTicketMicros字段,其中计算storedPermits的代码虽然容易理解,但是思路挺巧妙。⼀般来说,令牌桶算法的令牌需要以固定的速率进⾏添加,那么很⾃然想到可以起⼀个任务,按照⼀定的速度产⽣令牌,但是起⼀个新任务会占⽤⼀定的资源,从⽽加重系统的负担,此处的实现是利⽤时间差来计算这段时间产⽣的令牌数,以简单的计算完成了原本独⽴任务需要做的事情,开销⼤⼤减少
了。coolDownIntervalMicros⽅法是抽象⽅法,在SmoothBursty和SmoothWarmingUp有不同的实现,在SmoothBursty的实现是直接返
回stableIntervalMicros字段,这个字段⽬前还没设置过值,取默认值0.0,这⾥double的除零操作并不会抛异常,⽽是会返回⽆穷⼤。
司马穰苴
我们接着看⼀下doSetRate⽅法,这也是个抽象⽅法,在SmoothBursty中的实现如下:
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//初始化的过程中,maxPermits是默认值0
double oldMaxPermits = this.maxPermits;
//最⼤令牌数,maxBurstSeconds的值固定是1,最⼤令牌数与permitsPerSecond相等
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-ca this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} el {
//在初始化阶段,storedPermits赋值为0
storedPermits =
(oldMaxPermits == 0.0)
0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
maxPermits在此之前并没有设置过值,因此默认是0.0,这⾥只是将storedPermits初始化成了0。不过这⾥的代码也说明,在执⾏期
间maxPermits是可以在其他地⽅被修改的,如果出现了更改,就会等⽐例修改storedPermits的值。
公司的英文
到这⾥SmoothBursty的初始化过程就结束了,⼤体上是将内部的字段赋予初始值。我们接下来看看SmoothBursty的使⽤:
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
long microsToWait = rerve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / Micros(1L);
}
acquire⽅法⽤于从令牌桶中获取令牌,参数permits表⽰需要获取的令牌数量,如果当前没办法拿到需要的令牌,线程会阻塞⼀段时间,该⽅法返回等待的时间,rerve的实现如下:
final long rerve(int permits) {
//参数校验,略
checkPermits(permits);
synchronized (mutex()) {
return rerveAndGetWaitLength(permits, adMicros());
}
}
final long rerveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = rerveEarliestAvailable(permits, nowMicros);
//返回等待时间,如果不需要等待,返回0
return max(momentAvailable - nowMicros, 0);
}
final long rerveEarliestAvailable(int requiredPermits, long nowMicros) {
//再次执⾏resync⽅法,但由于stableIntervalMicros已经完成初始化,因此更新之后的storedPermits也不再是0
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
//取可⽤的令牌与需要的令牌两者的最⼩值
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
//计算该次请求超出的令牌数,这⽣成多余令牌需要的时间会算在下⼀次请求上
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
双十一购物狂欢节+ (long) (freshPermits * stableIntervalMicros);
//扣减令牌桶库存
this.storedPermits -= storedPermitsToSpend;
//注意这⾥返回的是上次的时间
手机怎么使用u盘
return returnValue;
}
rerve的核⼼逻辑在rerveEarliestAvailable⽅法中,该⽅法的主要思想是:检查当前令牌桶内令牌数是否满⾜需求,如果满⾜则不需要额外的等待时间,否则需要将额外等待时间追加到nextFreeTicketMicros。需要注意的是⽅法返回的不是更新过后的nextFreeTicketMicros,⽽是上⼀次请求更新过后的时间,这个时间就是当前线程需要阻塞的时间,也就是说,当前请求所需要等待的时间是由下次请求完成的,下次请求需要的等待时间由下下次请求完成,以此类推。当前请求的令牌数超过令牌桶中的令牌数越多,下次请求需要等待的时间就越长。并且这⾥并没有对requiredPermits的上限做检查,这就允许预⽀令牌,即假设桶的上限是100个令牌,那么⼀次请求可以允许超过100个令牌,只是⽣成多余令牌的时间需要算到下⼀个请求上。同时这⾥的逻辑也说明,获取令牌是直接成功的,如果获取的令牌数过多,会将storedPermits扣成0,但不会扣成负数。
到这⾥SmoothBursty的初始化以及获取令牌的所有逻辑就介绍完了,接下来看看另⼀个类SmoothWar
mingUp的源码。RateLimiter使⽤了模板⽅法模式,SmoothWarmingUp和SmoothBursty共⽤了很多代码,只是在⼀些特定⽅法上各⾃的实现细节不同。
static RateLimiter create(
double permitsPerSecond,
long warmupPeriod,
TimeUnit unit,
double coldFactor,
SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.tRate(permitsPerSecond);
return rateLimiter;
}
我们之前介绍的另外两个构造器,底层调⽤的是这个包级的create⽅法,该⽅法的5个参数中,只有coldFactor是新出现的,字⾯意思是冷启动因⼦,源码写死了是3.0,该值表⽰在预热阶段开始时,以多⼤的速率产⽣令牌,冷启动速率是稳定速率的三分之⼀,在预热阶段,该值会均匀减少,冷启动阶段结束后恢复到正常速率。
tRate⽅法底层也是调⽤doSetRate⽅法,这⾥重点关注SmoothWarmingUp中重载的部分:
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = Micros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
void resync(long nowMicros) {
济南特色美食if (nowMicros > nextFreeTicketMicros) {
//这⾥coolDownIntervalMicros返回⽆穷⼤,则newPermits赋值为0,与SmoothBursty中的⽆穷⼤正好相反
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
//与SmoothBursty中的实现不同
double coolDownIntervalMicros() {
//maxPermits=0,此⽅法返回⽆穷⼤
return warmupPeriodMicros / maxPermits;
}
//此⽅法的实现和SmoothBursty中的很不⼀样
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
//maxPermits初始化时候是0
double oldMaxPermits = maxPermits;
//设置冷启动⽣成令牌的间隔是正常值的3倍(codeFactor固定为3)
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; //公式①
//这⾥的计算说明maxPermits可能超过permitsPerSecond
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); //公式②
//slope是梯形部分斜线的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-ca this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} el {
storedPermits =
(oldMaxPermits == 0.0)
maxPermits // storedPermit赋值为maxPermits
: storedPermits * maxPermits / oldMaxPermits;
}
}
doSetRate的代码不容易理解,源码中利⽤图⽰介绍了⼏个变量之间的关系(但是本⼈仍然不是很理解,因此只能将结论放在这⾥,⽆法进⾏更多解释),如图所⽰,源码注释中说明了如下的两个等式:
梯形的⾯积等于预热时间warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)
由此可以得到公式②:
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
这⾥计算maxPermits的逻辑与SmoothBursty不⼀样,SmoothBursty⾥它的值等于permitsPerSecond,⽽这⾥的maxPermits是可以超
过permitsPerSecond的。
左边矩形的⾯积是梯形⾯积的⼀半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros
计算可得公式①:
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros
SmoothWarmingUp的初始化逻辑到这⾥就结束了,接下来介绍下它获取令牌的流程,acquire⽅法的其他部分上⽂已经介绍过,此处重点介绍storedPermitsToWaitTime⽅法:
final long rerveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
周日记怎么写
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
//存储的令牌数量超出thresholdPermits的部分,这部分反映在梯形区域
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
//availablePermitsAboveThreshold > 0.0表⽰存储的令牌数已经超过thresholdPermits,到了梯形区域
if (availablePermitsAboveThreshold > 0.0) {
/
/permitsAboveThresholdToTake表⽰梯形区域的⾼
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
//length计算的是梯形的上底+下底
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
//梯形区域的⾯积,即⽣产梯形区域的令牌数所需要的时间
//由此处可知,只要存储的令牌数超过了左侧矩形区域达到了梯形区域,就需要梯形区域的等待时间
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
//扣除掉需要消耗的梯形区域的令牌数,表⽰还需要从左侧矩形区域取得的令牌数量
permitsToTake -= permitsAboveThresholdToTake;
}
/
/等待时间=梯形区域的时间+矩形区域的时间,按照代码的逻辑,只要梯形区域的令牌数⼩于permitsToTake,就要取左侧的令牌,需要额外的等待时间
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
//由前⽂可知,slope = =y/x = 产⽣令牌间隔/令牌数,permits * slope表⽰产⽣令牌间隔的增量,加上stableIntervalMicros表⽰梯形的底
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}