简述
每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流
来保证接口的可用性或者降级
可用性。以防止非预期的请求对系统压力过大而引起的系统瘫痪
。
通常的策略就是拒绝多余的访问,或者让多余的访问排队等待服务,或者引流。
如果要准确的控制QPS,简单的做法是维护一个单位时间内的计数器
,如判断单位时间已经过去,则将计数器
重置零。
如上图所示会出现两倍的QPS的情况:没有很好的处理单位时间的边界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都触发了最大的请求数,将目光移动一下,就看到在两毫秒内发生了两倍的QPS。
常用的更平滑的限流算法有两种:漏桶算法
和令牌桶算法
。
漏桶算法
-
漏桶(Leaky Bucket)算法
思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率 -
因为漏桶的
漏出速率
是固定
的参数,所以,即使网络中不存在资源冲突
(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率(突发性流量也会被按照固定的速率放行)
令牌桶算法
-
令牌桶算法(Token Bucket)
。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务。 -
因为令牌桶是按照恒定的速率往桶中放令牌,即如果由突发性流量来访问,只要桶中有足够的令牌,那就可以放行。可以预防突发特性的流量。
RateLimiter
Google开源工具包Guava
提供了限流工具类RateLimiter
,该类基于令牌桶算法(Token Bucket)
来完成限流。
RateLimiter和Java中的信号量
(java.util.concurrent.Semaphore)类似,Semaphore通常用于限制并发量。
public class SemaphoreTest {
static Semaphore semaphore = new Semaphore(10);
static ExecutorService executor = Executors.newFixedThreadPool(20);
public static void Test(){
for (int i = 0; i < 20; i++) {
int finalI = i;
executor.execute(()->{
try {
semaphore.acquire();
System.out.println("成员" + finalI + " 过来了");
TimeUnit.SECONDS.sleep(10);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
public static void main(String[] args) {
Test();
}
}
Semaphore
用来控制同时访问某个资源的并发数量,如上面的代码,设置 100 个线程工作,但是我们能做到最多只有 10 个线程能同时到方法中。它控制的是并发数量
。
public class RateLimiterTest {
final RateLimiter rateLimiter = RateLimiter.create(2.0);
//每秒 2 个许可
public void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(10);
executor.execute(task);
}
}
public void test() {
double acquire = rateLimiter.acquire(10);
System.out.println(acquire);
}
public static void main(String[] args) {
RateLimiterTest t = new RateLimiterTest();
t.test();
t.test();
}
}
//输出
0.0
4.997451
而 RateLimiter
是用来控制访问资源的速率(rate)的,它强调的是控制速率
。比如控制每秒只能有 100 个请求通过,比如允许每秒发送 1MB 的数据。
它的构造方法指定一个 permitsPerSecond
参数,代表每秒钟产生多少个 permits
,这就是我们的速率
。
RateLimiter 允许预占未来的令牌,比如,每秒产生 5 个 permits,我们可以单次请求 100 个,这样,紧接着的下一个请求需要等待大概 20 秒才能获取到 permits。
如上面的代码所示:每秒产生2个permits
但是我一下子就取10个permits
,在测试方法中我尝试取了2次,看输出结果第一次直接输出,而第二次过了5秒。
RateLimiter 源码解析
RateLimiter 类图
RateLimiter 抽象类
目前只有一个子类,那就是抽象类SmoothRateLimiter
。SmoothRateLimiter
有两个实现类:SmoothWarmingUp
和SmoothBursty
。
RateLimiter 抽象类
RateLimiter 抽象类提供静态方法来用来实例化具体哪种模式,实例化后,通过acquire
方法来获取令牌
public abstract class RateLimiter {
//SmoothBursty 模式
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
//SmoothWarmingUp模式
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}
//获取令牌
public double acquire() {}
public double acquire(int permits) {}
//是否能获取到令牌
public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}
//获取当前的速率 以及动态的设置当前的速率
public final double getRate() {}
public final void setRate(double permitsPerSecond) {}
}
RateLimiter 属性
//计时,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
private final SleepingStopwatch stopwatch;
//用来做锁,RateLimiter 依赖于 synchronized 来控制并发,各个属性甚至都没有用 volatile 修饰。
private volatile Object mutexDoNotUseDirectly;
SmoothBursty 模式
先分析SmoothBursty
模式,此模式会缓存一定数量的 permits
在池中,这样对于突发请求,能及时得到满足。
想象一下我们的某个接口,很久没有请求过来,突然同时来了好几个请求,如果我们没有缓存一些 permits 的话,很多线程就需要等待了。SmoothBursty 默认缓存最多 1
秒钟的 permits,不可以修改。
SmoothBursty 重要的属性
// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;
// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;
// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000
double stableIntervalMicros;
// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧
private long nextFreeTicketMicros = 0L;
storedPermits
:当前还有多少 permits 没有被使用,被存下来的 permits 数量maxPermits
:最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值stableIntervalMicros
:每隔多少时间产生一个 permit,nextFreeTicketMicros
:下一次可以获取 permits 的时间
SmoothBursty 构造函数
//SmoothBursty 模式
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
- 构造参数
permitsPerSecond
指定每秒钟可以产生多少个 permits。 - 可以看到构造方法调用了
create
静态方法。此静态方法用于构建SmoothBursty
模式的限流 - 可以看到
maxBurstSeconds
默认被设置了1.0
,也就是说它最多就只能缓存1 秒钟
的 permits。 - 然后调用了
rateLimiter#setRate()
方法设置当前的速率是多少。
RateLimiter.setRate( ) 方法
用于设置当前的速率
是多少
public final void setRate(double permitsPerSecond) {
//<1> 检查参数
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
//<2> 子类实现
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
//抽象方法 由子类实现
abstract void doSetRate(double permitsPerSecond, long nowMicros);
setRate
这个方法是一个public
方法,它可以用来调整速率。- 这里用了
synchronized
控制并发。 - 调用了
doSetRate
方法,抽象方法由子类SmoothRateLimiter
实现
SmoothRateLimiter.doSetRate( )
实现上面的RateLimiter.setRate()
方法中<2>
的抽象方法doSetRate
final void doSetRate(double permitsPerSecond, long nowMicros) {
//<1> 同步
resync(nowMicros);
//<2> 计算属性 stableIntervalMicros 每生成一个permit 需要多少微妙
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
<3> 子类实现
doSetRate(permitsPerSecond, stableIntervalMicros);
}
//SmoothRateLimiter.resync
void resync(long nowMicros) {
// 如果 nextFreeTicket 已经过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景
// 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
//计算的新permits
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
//给storedPermits 赋值
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
resync 方法根据当前时间来调整 storedPermits
和 nextFreeTicketMicros
-
storedPermits
:当前还有多少 permits 没有被使用,被存下来的 permits 数量 -
nextFreeTicketMicros
:下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧
SmoothBursty.doSetRate方法
在SmoothRateLimiter#doSetRate()
方法中<3>
,经过resync方法同步后,会进入此方法
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
//<1> 这里计算了,maxPermits 为 1 秒产生的 permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 因为 storedPermits 的值域变化了,需要等比例缩放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
上面这个方法,我们要这么看,原来的 RateLimiter 是用某个 permitsPerSecond 值初始化的,现在我们要调整这个频率。对于 maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放。
到此,构造方法就完成了
,我们得到了一个 RateLimiter 的实现类 SmoothBursty
的实例。
acquire
获取令牌
//默认是获取一个
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
//<1> 预约,如果当前不能直接获取到 permits,需要等待
// 返回值代表需要 sleep 多久
long microsToWait = reserve(permits);
//<2> sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
回到上面使用RateLimiter的例子中的输出
//输出
0.0
4.997451
因为设置每秒生成2
个permit,但是第一个acquire
一下子获取了10个,所以在第二个acquire
中需要sleep 5秒
,故输出了4.997451
reserve 方法
在acquire
方法中的<1>
处用于计算是否需要sleep
,是的话需要sleep
多久
final long reserve(int permits) {
//检查
checkPermits(permits);
//synchronized 保持同步
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 计算时长
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可以使用多少个 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这个不够的部分,需要等待多久时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 将 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
我们可以看到,获取 permits 的时候,其实是获取了两部分:
- 一部分来自于存量
storedPermits
- 存量不够的话,另一部分来自于预占未来的
freshPermits
。
这里提一个关键点吧,返回值是
nextFreeTicketMicros
的旧值,因为只要到这个时间点,就说明当次 acquire 可以成功返回了,而不管 storedPermits 够不够。如果 storedPermits 不够,会将 nextFreeTicketMicros 往前推一定的时间,预占了一定的量。
到这里,acquire 方法就分析完了。 接下来分析另外一种模式 SmoothWarmingUp
SmoothWarmingUp 模式
SmoothBursty
可以处理突发请求,因为它会缓存最多 1 秒的 permits
。
SmoothWarmingUp
适用于资源需要预热的场景
,比如我们的某个接口业务,需要使用到数据库连接,由于连接需要预热才能进入到最佳状态,如果我们的系统长时间处于低负载或零负载状态(当然,应用刚启动也是一样的),连接池中的连接慢慢释放掉了,此时我们认为连接池是冷的。
假设我们的业务在稳定状态下,正常可以提供最大 1000 QPS
的访问,但是如果连接池是冷的,我们就不能让 1000 个请求同时进来,因为这会拖垮我们的系统,我们应该有个预热升温的过程。
对应到 SmoothWarmingUp
中,如果系统处于低负载状态
,storedPermits
会一直增加,当请求来的时候,我们要从 storedPermits 中取 permits
,最关键的点在于,从 storedPermits 中取 permits 的操作是比较耗时的,因为没有预热。
SmoothBursty 它从 storedPermits 中获取 permits 是不需要等待时间的,而这边洽洽相反,从 storedPermits 获取需要更多的时间。
如上图所示:X 轴
代表 storedPermits 的数量
,Y 轴
代表获取一个 permits 需要的时间
。
假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是写死的,用户不能修改)。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 需要 300ms,而如果 storedPermits 小于 thresholdPermits 的时候,只需要 100ms。
看上图有一条垂直线k
,它与 X 轴
的交点 k 代表当前 storedPermits
的数量:
-
当系统在非常繁忙的时候,这条线停留在 x=0 处,此时 storedPermits 为 0(
一直调用acquire来获取令牌
) -
当 limiter
没有被调用
的时候,这条线慢慢往右移动,直到 x=maxPermits 处; -
如果
limiter
被调用,那么这条线又慢慢往左移动,直到 x=0 处;- 当 storedPermits 处于 maxPermits 状态时,我们认为 limiter 中的 permits 是
冷的
, - 此时获取一个
permit
需要较多的时间,因为需要预热,有一个关键的分界点是thresholdPermits
。 - 当小于
thresholdPermits
则认为不需要预热直接就获取返回,反之需要预热获取令牌的时间延长
- 当 storedPermits 处于 maxPermits 状态时,我们认为 limiter 中的 permits 是
预热时间是我们在构造的时候指定的,上图中红色梯形的面积就是预热时间
,因为预热完成后,我们能进入到一个稳定的速率中(stableInterval),下面我们来计算出 thresholdPermits
和 maxPermits
的值。
//SmoothWarmingUp 模式
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
return create(
permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}
有一个关键点,从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍
,梯形的面积是 warmupPeriod
。
之所以长方形的面积是 warmupPeriod/2
,是因为 coldFactor
是硬编码的 3
。从上面SmoothWarmingUp 的构造函数可以看到
-
梯形面积为 warmupPeriod
,即:warmupPeriod = 2 * stableInterval * thresholdPermits -
thresholdPermits
的值:thresholdPermits = 0.5 * warmupPeriod / stableInterval -
然后我们根据梯形面积的计算公式:warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)
-
得出 maxPermits 为:maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)
这样,我们就得到了 thresholdPermits
和 maxPermits
的值。
接下来,我们来看一下冷却时间间隔
,它指的是 storedPermits 中每个 permit
的增长速度
,也就是我们前面说的 x=k 这条垂直线往右的移动速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros
的时间,我们将其定义为:
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 在这里使用
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
SmoothWarmingUp.doSetRate( )方法
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 是固定的 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// 这个公式我们上面已经说了
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// 这个公式我们上面也已经说了
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 计算那条斜线的斜率。数学知识,对边 / 临边
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
storedPermitsToWaitTime 方法
setRate
方法非常简单,接下来,我们要分析的是 storedPermitsToWaitTime
方法,我们回顾一下下面的代码:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
这段代码是 acquire
方法的核心,waitMicros 由两部分组成:
- 一部分是从
storedPermits
中获取花费的时间 - 一部分是等待
freshPermits
产生花费的时间。在SmoothBursty
的实现中,从 storedPermits 中获取 permits 直接返回0
,不需要等待。
而在 SmoothWarmingUp 的实现中,由于需要预热,所以从 storedPermits 中取 permits 需要花费一定的时间,其实就是要计算下图中,阴影部分的面积。
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 如果右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积
if (availablePermitsAboveThreshold > 0.0) {
// 从右边部分获取的 permits 数量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 梯形面积公式:(上底+下底)*高/2
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 长方形部分的阴影面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}