Guava (一) :RateLimiter

2020年3月21日 | 作者 Siran | 5600字 | 阅读大约需要12分钟
归档于 Guava | 标签 #Guava

简述

每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限流来保证接口的可用性或者降级可用性。以防止非预期的请求对系统压力过大而引起的系统瘫痪

通常的策略就是拒绝多余的访问,或者让多余的访问排队等待服务,或者引流

如果要准确的控制QPS,简单的做法是维护一个单位时间内的计数器,如判断单位时间已经过去,则将计数器重置零。

如上图所示会出现两倍的QPS的情况:没有很好的处理单位时间的边界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都触发了最大的请求数,将目光移动一下,就看到在两毫秒内发生了两倍的QPS。

常用的更平滑的限流算法有两种:漏桶算法令牌桶算法

漏桶算法

  • 漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率

  • 因为漏桶的漏出速率固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率。因此,漏桶算法对于存在突发特性的流量来说缺乏效率(突发性流量也会被按照固定的速率放行)

令牌桶算法

  • 令牌桶算法(Token Bucket)。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务

  • 因为令牌桶是按照恒定的速率往桶中放令牌,即如果由突发性流量来访问,只要桶中有足够的令牌,那就可以放行。可以预防突发特性的流量。

RateLimiter

Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucket)来完成限流。

RateLimiter和Java中的信号量(java.util.concurrent.Semaphore)类似,Semaphore通常用于限制并发量。

public class SemaphoreTest {
    static Semaphore semaphore = new Semaphore(10);
    static ExecutorService executor = Executors.newFixedThreadPool(20);
    public static void Test(){
        for (int i = 0; i < 20; i++) {
            int finalI = i;
            executor.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.println("成员" + finalI + " 过来了");
                    TimeUnit.SECONDS.sleep(10);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });

        }
    }

    public static void main(String[] args) {
        Test();
    }
}

Semaphore 用来控制同时访问某个资源的并发数量,如上面的代码,设置 100 个线程工作,但是我们能做到最多只有 10 个线程能同时到方法中。它控制的是并发数量

public class RateLimiterTest {
    final RateLimiter rateLimiter = RateLimiter.create(2.0);

    //每秒 2 个许可
    public void submitTasks(List<Runnable> tasks, Executor executor) {
        for (Runnable task : tasks) {
            rateLimiter.acquire(10);
            executor.execute(task);
        }
    }

    public void test() {
        double acquire = rateLimiter.acquire(10);
        System.out.println(acquire);
    }
    public static void main(String[] args) {
        RateLimiterTest t = new RateLimiterTest();
        t.test();
        t.test();
    }
}
//输出
0.0
4.997451

RateLimiter 是用来控制访问资源的速率(rate)的,它强调的是控制速率。比如控制每秒只能有 100 个请求通过,比如允许每秒发送 1MB 的数据。

它的构造方法指定一个 permitsPerSecond 参数,代表每秒钟产生多少个 permits,这就是我们的速率

RateLimiter 允许预占未来的令牌,比如,每秒产生 5 个 permits,我们可以单次请求 100 个,这样,紧接着的下一个请求需要等待大概 20 秒才能获取到 permits。

如上面的代码所示:每秒产生2个permits 但是我一下子就取10个permits ,在测试方法中我尝试取了2次,看输出结果第一次直接输出,而第二次过了5秒。


RateLimiter 源码解析

RateLimiter 类图

  • RateLimiter 抽象类目前只有一个子类,那就是抽象类 SmoothRateLimiter
  • SmoothRateLimiter 有两个实现类:SmoothWarmingUpSmoothBursty

RateLimiter 抽象类

RateLimiter 抽象类提供静态方法来用来实例化具体哪种模式,实例化后,通过acquire方法来获取令牌

public abstract class RateLimiter {
  //SmoothBursty 模式
  public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }
 
  //SmoothWarmingUp模式
  public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(
        permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
  }
  
  //获取令牌
  public double acquire() {}
  public double acquire(int permits) {}

  //是否能获取到令牌
  public boolean tryAcquire() {}
  public boolean tryAcquire(int permits) {}
  public boolean tryAcquire(long timeout, TimeUnit unit) {}
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}
 
  //获取当前的速率 以及动态的设置当前的速率
  public final double getRate() {}
  public final void setRate(double permitsPerSecond) {}
}

RateLimiter 属性

//计时,RateLimiter 把实例化的时间设置为 0 值,后续都是取相对时间,用微秒表示。
private final SleepingStopwatch stopwatch;

//用来做锁,RateLimiter 依赖于 synchronized 来控制并发,各个属性甚至都没有用 volatile 修饰。
private volatile Object mutexDoNotUseDirectly;

SmoothBursty 模式

先分析SmoothBursty 模式,此模式会缓存一定数量的 permits 在池中,这样对于突发请求,能及时得到满足。

想象一下我们的某个接口,很久没有请求过来,突然同时来了好几个请求,如果我们没有缓存一些 permits 的话,很多线程就需要等待了。SmoothBursty 默认缓存最多 1 秒钟的 permits,不可以修改。


SmoothBursty 重要的属性

// 当前还有多少 permits 没有被使用,被存下来的 permits 数量
double storedPermits;

// 最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
double maxPermits;

// 每隔多少时间产生一个 permit,
// 比如我们构造方法中设置每秒 5 个,也就是每隔 200ms 一个,这里单位是微秒,也就是 200,000
double stableIntervalMicros;

// 下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧
private long nextFreeTicketMicros = 0L; 
  • storedPermits:当前还有多少 permits 没有被使用,被存下来的 permits 数量
  • maxPermits:最大允许缓存的 permits 数量,也就是 storedPermits 能达到的最大值
  • stableIntervalMicros:每隔多少时间产生一个 permit,
  • nextFreeTicketMicros:下一次可以获取 permits 的时间

SmoothBursty 构造函数

//SmoothBursty 模式
public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
  • 构造参数 permitsPerSecond 指定每秒钟可以产生多少个 permits。
  • 可以看到构造方法调用了create静态方法。此静态方法用于构建 SmoothBursty 模式的限流
  • 可以看到maxBurstSeconds 默认被设置了1.0,也就是说它最多就只能缓存 1 秒钟的 permits。
  • 然后调用了 rateLimiter#setRate() 方法设置当前的速率是多少。

RateLimiter.setRate( ) 方法

用于设置当前的速率是多少

public final void setRate(double permitsPerSecond) {
    //<1> 检查参数
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      //<2> 子类实现
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

//抽象方法 由子类实现
abstract void doSetRate(double permitsPerSecond, long nowMicros);
  • setRate 这个方法是一个 public 方法,它可以用来调整速率。
  • 这里用了 synchronized 控制并发。
  • 调用了 doSetRate 方法,抽象方法由子类 SmoothRateLimiter 实现

SmoothRateLimiter.doSetRate( )

实现上面的RateLimiter.setRate() 方法中<2>的抽象方法doSetRate

final void doSetRate(double permitsPerSecond, long nowMicros) {
    //<1> 同步
    resync(nowMicros);
    //<2> 计算属性 stableIntervalMicros  每生成一个permit 需要多少微妙
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    <3> 子类实现
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

//SmoothRateLimiter.resync
void resync(long nowMicros) {
    // 如果 nextFreeTicket 已经过掉了,想象一下很长时间都没有再次调用 limiter.acquire() 的场景
    // 需要将 nextFreeTicket 设置为当前时间,重新计算 storedPermits
    if (nowMicros > nextFreeTicketMicros) {
      //计算的新permits
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      //给storedPermits 赋值
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

resync 方法根据当前时间来调整 storedPermitsnextFreeTicketMicros

  • storedPermits:当前还有多少 permits 没有被使用,被存下来的 permits 数量

  • nextFreeTicketMicros:下一次可以获取 permits 的时间,这个时间是相对 RateLimiter 的构造时间的,是一个相对时间,理解为时间戳吧


SmoothBursty.doSetRate方法

SmoothRateLimiter#doSetRate() 方法中<3>,经过resync方法同步后,会进入此方法

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  //<1> 这里计算了,maxPermits 为 1 秒产生的 permits
  maxPermits = maxBurstSeconds * permitsPerSecond;
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    // 因为 storedPermits 的值域变化了,需要等比例缩放
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

上面这个方法,我们要这么看,原来的 RateLimiter 是用某个 permitsPerSecond 值初始化的,现在我们要调整这个频率。对于 maxPermits 来说,是重新计算,而对于 storedPermits 来说,是做等比例的缩放。

到此,构造方法就完成了,我们得到了一个 RateLimiter 的实现类 SmoothBursty 的实例。


acquire

获取令牌

//默认是获取一个
public double acquire() {
  return acquire(1);
}

public double acquire(int permits) {
  //<1> 预约,如果当前不能直接获取到 permits,需要等待
  // 返回值代表需要 sleep 多久
  long microsToWait = reserve(permits);
  //<2> sleep
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  // 返回 sleep 的时长
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

回到上面使用RateLimiter的例子中的输出

//输出
0.0
4.997451

因为设置每秒生成2个permit,但是第一个acquire一下子获取了10个,所以在第二个acquire中需要sleep 5秒,故输出了4.997451


reserve 方法

acquire方法中的<1>处用于计算是否需要sleep,是的话需要sleep 多久

final long reserve(int permits) {
  //检查
  checkPermits(permits);
  //synchronized 保持同步
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
  // 返回 nextFreeTicketMicros
  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
  // 计算时长
  return max(momentAvailable - nowMicros, 0);
}

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  // 这里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
  resync(nowMicros);
  // 返回值就是 nextFreeTicketMicros,注意刚刚已经做了 resync 了,此时它是最新的正确的值
  long returnValue = nextFreeTicketMicros;
  // storedPermits 中可以使用多少个 permits
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  // storedPermits 中不够的部分
  double freshPermits = requiredPermits - storedPermitsToSpend;
  // 为了这个不够的部分,需要等待多久时间
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
          + (long) (freshPermits * stableIntervalMicros);
  // 将 nextFreeTicketMicros 往前推
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  // storedPermits 减去被拿走的部分
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

我们可以看到,获取 permits 的时候,其实是获取了两部分:

  • 一部分来自于存量 storedPermits
  • 存量不够的话,另一部分来自于预占未来的 freshPermits

这里提一个关键点吧,返回值是 nextFreeTicketMicros 的旧值,因为只要到这个时间点,就说明当次 acquire 可以成功返回了,而不管 storedPermits 够不够。如果 storedPermits 不够,会将 nextFreeTicketMicros 往前推一定的时间,预占了一定的量。

到这里,acquire 方法就分析完了。 接下来分析另外一种模式 SmoothWarmingUp


SmoothWarmingUp 模式

SmoothBursty 可以处理突发请求,因为它会缓存最多 1 秒的 permits

SmoothWarmingUp 适用于资源需要预热的场景比如我们的某个接口业务,需要使用到数据库连接,由于连接需要预热才能进入到最佳状态,如果我们的系统长时间处于低负载或零负载状态(当然,应用刚启动也是一样的),连接池中的连接慢慢释放掉了,此时我们认为连接池是冷的。

假设我们的业务在稳定状态下,正常可以提供最大 1000 QPS 的访问,但是如果连接池是冷的,我们就不能让 1000 个请求同时进来,因为这会拖垮我们的系统,我们应该有个预热升温的过程。

对应到 SmoothWarmingUp 中,如果系统处于低负载状态storedPermits 会一直增加,当请求来的时候,我们要从 storedPermits 中取 permits,最关键的点在于,从 storedPermits 中取 permits 的操作是比较耗时的,因为没有预热。

SmoothBursty 它从 storedPermits 中获取 permits 是不需要等待时间的,而这边洽洽相反,从 storedPermits 获取需要更多的时间。

如上图所示:X 轴代表 storedPermits 的数量Y 轴代表获取一个 permits 需要的时间

假设指定 permitsPerSecond 为 10,那么 stableInterval 为 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是写死的,用户不能修改)。也就是说,当达到 maxPermits 时,此时处于系统最冷的时候,获取一个 permit 需要 300ms,而如果 storedPermits 小于 thresholdPermits 的时候,只需要 100ms。

看上图有一条垂直线k,它与 X 轴的交点 k 代表当前 storedPermits 的数量:

  • 当系统在非常繁忙的时候,这条线停留在 x=0 处,此时 storedPermits 为 0(一直调用acquire来获取令牌)

  • 当 limiter 没有被调用的时候,这条线慢慢往右移动,直到 x=maxPermits 处;

  • 如果 limiter 被调用,那么这条线又慢慢往左移动,直到 x=0 处;

    • 当 storedPermits 处于 maxPermits 状态时,我们认为 limiter 中的 permits 是冷的
    • 此时获取一个 permit 需要较多的时间,因为需要预热,有一个关键的分界点是 thresholdPermits
    • 当小于thresholdPermits 则认为不需要预热直接就获取返回,反之需要预热获取令牌的时间延长

预热时间是我们在构造的时候指定的,上图中红色梯形的面积就是预热时间,因为预热完成后,我们能进入到一个稳定的速率中(stableInterval),下面我们来计算出 thresholdPermitsmaxPermits 的值。

//SmoothWarmingUp 模式
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);
    return create(
        permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
  }

有一个关键点,从 thresholdPermits 到 0 的时间,是从 maxPermits 到 thresholdPermits 时间的一半,也就是梯形的面积是长方形面积的 2 倍,梯形的面积是 warmupPeriod

之所以长方形的面积是 warmupPeriod/2,是因为 coldFactor 是硬编码的 3。从上面SmoothWarmingUp 的构造函数可以看到

  • 梯形面积为 warmupPeriod,即:warmupPeriod = 2 * stableInterval * thresholdPermits

  • thresholdPermits 的值:thresholdPermits = 0.5 * warmupPeriod / stableInterval

  • 然后我们根据梯形面积的计算公式:warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

  • 得出 maxPermits 为:maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

这样,我们就得到了 thresholdPermitsmaxPermits 的值。

接下来,我们来看一下冷却时间间隔,它指的是 storedPermits 中每个 permit增长速度,也就是我们前面说的 x=k 这条垂直线往右的移动速度,为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间,我们将其定义为:

double coolDownIntervalMicros() {
    return warmupPeriodMicros / maxPermits;
}

void resync(long nowMicros) {
  if (nowMicros > nextFreeTicketMicros) {
    // coolDownIntervalMicros 在这里使用
    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
    storedPermits = min(maxPermits, storedPermits + newPermits);
    nextFreeTicketMicros = nowMicros;
  }
}

SmoothWarmingUp.doSetRate( )方法

void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // coldFactor 是固定的 3
    double coldIntervalMicros = stableIntervalMicros * coldFactor;
    // 这个公式我们上面已经说了
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
    // 这个公式我们上面也已经说了
    maxPermits =
        thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
    // 计算那条斜线的斜率。数学知识,对边 / 临边
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
    } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
    }
}

storedPermitsToWaitTime 方法

setRate 方法非常简单,接下来,我们要分析的是 storedPermitsToWaitTime 方法,我们回顾一下下面的代码:

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros;
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
  double freshPermits = requiredPermits - storedPermitsToSpend;
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 这部分固定返回 0
          + (long) (freshPermits * stableIntervalMicros);
  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

这段代码是 acquire 方法的核心,waitMicros 由两部分组成:

  • 一部分是从 storedPermits 中获取花费的时间
  • 一部分是等待 freshPermits 产生花费的时间。在 SmoothBursty 的实现中,从 storedPermits 中获取 permits 直接返回 0,不需要等待。

而在 SmoothWarmingUp 的实现中,由于需要预热,所以从 storedPermits 中取 permits 需要花费一定的时间,其实就是要计算下图中,阴影部分的面积。

long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
  double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
  long micros = 0;
  // 如果右边梯形部分有 permits,那么先从右边部分获取permits,计算梯形部分的阴影部分的面积
  if (availablePermitsAboveThreshold > 0.0) {
    // 从右边部分获取的 permits 数量
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
    // 梯形面积公式:(上底+下底)*高/2
    double length =
        permitsToTime(availablePermitsAboveThreshold)
            + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
    micros = (long) (permitsAboveThresholdToTake * length / 2.0);
    permitsToTake -= permitsAboveThresholdToTake;
  }
  // 加上 长方形部分的阴影面积
  micros += (long) (stableIntervalMicros * permitsToTake);
  return micros;
}

// 对于给定的 x 值,计算 y 值
private double permitsToTime(double permits) {
  return stableIntervalMicros + permits * slope;
}

参考于

RateLimiter 源码分析(Guava 和 Sentinel 实现)