Resilience4j CircuitBreaker 源码分析

2020年4月15日 | 作者 Siran | 4300字 | 阅读大约需要9分钟
归档于 Resilience4j | 标签 #Resilience4j

简述

在Resilience4j 中所有模块的事件机制采用了观察者模式,所以先写一个观察者模式熟悉一下以便在之后的源码分析。

//抽象的主体角色
interface AbstractSubject{
  void registerObserver();
  void notifyObserver();
}
//具体的主体角色
class ConcreteSubject implements AbstractSubject{
  List<AbstractObserver> observers = new ArrayList<>();
  
  public void registerObserver(Observer o) {
        this.observers.add(o);
    }

  public void notifyObservers(String tweet) {
        observers.forEach(o -> o.notify(tweet));
    }
}
//抽象的观察者角色
interface AbstractObserver{
  void notify(String tweet);
}
//具体的观察的角色
class ConcreteNYTimes implements Observer {

    @Override
    public void notify(String tweet) {
        if (tweet != null && tweet.contains("money")) {
            System.out.println("Breaking news in NY! " + tweet);
        }
    }
}
//具体的观察的角色
class ConcreteGuardian implements Observer {

    @Override
    public void notify(String tweet) {
        if (tweet != null && tweet.contains("queen")) {
            System.out.println("Yet another news in London... " + tweet);
        }
    }
}

//测试
public static void main(String[] args){
  ConcreteSubject cs = new ConcreteSubject();
  cs.registerObserver(new ConcreteNYTimes());
  cs.registerObserver(new ConcreteGuardian());
  cs.notifyObservers("Hi");
}

Resilience4j的CircuitBreaker主要由6个部分组成:

  • CircuitBreakerRegistry:管理熔断器实例的注册容器
  • CircuitBreakerConfig:熔断器的相关配置
  • CircuitBreakerState:熔断器的各种状态
  • CircuitBreakerMetrics:触发熔断器状态变化的指标
  • CircuitBreakerEvent:熔断器行为变化产生的事件
  • CircuitBreaker:熔断器本身

整体的调用流程是:

  1. 通过CircuitBreakerRegistry 注册一个熔断器。
  2. 然后得到CircuitBreaker 熔断器装饰你的调用方法。
  3. 跟你方法的执行判断是否是慢请求或者请求失败,然后发布事件。
  4. 统计这些事件然后根据熔断器的指标来判断熔断器是否要关闭、半打开、打开。

源码分析

1.CircuitBreakerRegistry

管理熔断器实例的注册容器

public interface CircuitBreakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {
    
    /**
     * <1> 根据自定义的配置,创建CircuitBreakerRegistry实例  线程安全的单例
     */
    static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {
        return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);
    }

    //...省略 都差不多 就是创建创建CircuitBreakerRegistry实例 
   
    //默认配置
    static CircuitBreakerRegistry ofDefaults() {
        return new InMemoryCircuitBreakerRegistry();

    //获取所有的熔断器  
    Seq<CircuitBreaker> getAllCircuitBreakers();

}

1.1 InMemoryCircuitBreakerRegistry

此类是真正管理熔断器的。根据上面<1> 处创建了InMemoryCircuitBreakerRegistry 对象。

//io.github.resilience4j.circuitbreaker.internal.InMemoryCircuitBreakerRegistry
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
        super(defaultConfig);
    }

//io.github.resilience4j.core.registry.AbstractRegistry
public AbstractRegistry(C defaultConfig) {
        this(defaultConfig, HashMap.empty());
    }

public AbstractRegistry(C defaultConfig, List<RegistryEventConsumer<E>> registryEventConsumers,
        Map<String, String> tags) {
        this.configurations = new ConcurrentHashMap<>();
        //可以看到是用ConcurrentHashMap 来作为管理熔断器的容器,保证线程安全
        this.entryMap = new ConcurrentHashMap<>();
        this.eventProcessor = new RegistryEventProcessor(
            Objects.requireNonNull(registryEventConsumers, CONSUMER_MUST_NOT_BE_NULL));
        this.registryTags = Objects.requireNonNull(tags, TAGS_MUST_NOT_BE_NULL);
        this.configurations
            .put(DEFAULT_CONFIG, Objects.requireNonNull(defaultConfig, CONFIG_MUST_NOT_BE_NULL));
    }
  • 在上面可以得知,CircuitBreakerRegistry 只是一个入口,真正管理熔断器的是InMemoryCircuitBreakerRegistry
  • 而InMemoryCircuitBreakerRegistry 又是通过 ConcurrentHashMap 来作为熔断器的容器

2.CircuitBreakerConfig

熔断器的相关配置

public class CircuitBreakerConfig {

    // 请求调用失败的阈值,百分比。默认是50%
    public static final int DEFAULT_FAILURE_RATE_THRESHOLD = 50; // Percentage
    // 请求调用慢请求的阀值, 百分比。 默认50%
    public static final int DEFAULT_SLOW_CALL_RATE_THRESHOLD = 100; // Percentage
    // 在open 状态等待的时间, 时间到了 就进去 half-open状态  默认60s
    public static final int DEFAULT_WAIT_DURATION_IN_OPEN_STATE = 60; // Seconds
    // 在half-open 状态下 允许通过请求数量  默认10个
    public static final int DEFAULT_PERMITTED_CALLS_IN_HALF_OPEN_STATE = 10;
    // 最少需要多少个请求调用 进行计算错误率  默认100
    public static final int DEFAULT_MINIMUM_NUMBER_OF_CALLS = 100;
    // 在滑动窗口的模式下, 需要多少个请求调用进行计算,  默认100
    public static final int DEFAULT_SLIDING_WINDOW_SIZE = 100;
    public static final int DEFAULT_SLOW_CALL_DURATION_THRESHOLD = 60; // Seconds
    // 统计的模式。  默认 计数, 还有一个是 滑动窗口
    public static final SlidingWindowType DEFAULT_SLIDING_WINDOW_TYPE = SlidingWindowType.COUNT_BASED;
    public static final boolean DEFAULT_WRITABLE_STACK_TRACE_ENABLED = true;
    // 是否记录请求调用失败的断言,默认所有异常都记录。
    private static final Predicate<Throwable> DEFAULT_RECORD_EXCEPTION_PREDICATE = throwable -> true;
    // 是否忽视请求调用失败的断言,  默认false
    private static final Predicate<Throwable> DEFAULT_IGNORE_EXCEPTION_PREDICATE = throwable -> false;
    // The default exception predicate counts all exceptions as failures.
    private Predicate<Throwable> recordExceptionPredicate = DEFAULT_RECORD_EXCEPTION_PREDICATE;
    // The default exception predicate ignores no exceptions.
    private Predicate<Throwable> ignoreExceptionPredicate = DEFAULT_IGNORE_EXCEPTION_PREDICATE;

    @SuppressWarnings("unchecked")
    // 请求调用失败,存储异常记录的集合
    private Class<? extends Throwable>[] recordExceptions = new Class[0];

    @SuppressWarnings("unchecked")
    // 请求调用失败,忽略异常记录的集合
    private Class<? extends Throwable>[] ignoreExceptions = new Class[0];
}
  • failureRateThreshold:请求调用失败的阈值百分比。默认是50%
  • slowCallRateThreshold::请求调用慢请求的阀值百分比。 默认100%
  • slowCallDurationThreshold : 调用请求超过此只就被视为是慢请求,默认60秒
  • permittedNumberOfCallsInHalfOpenState:在半开状态下允许通过的请求数,默认10个
  • slidingWindowType:统计的类型,默认count-based,可选 time-based
  • slidingWindowSize:滑动窗口的大小,默认100
  • minimumNumberOfCalls:统计最小次数,默认10次 如果只有9次,那么不会触发统计
  • waitDurationInOpenState:在打开状态下,等待多久进去半打开状态,默认 60秒
  • automaticTransitionFromOpenToHalfOpenEnabled:默认为false,如果设置为true,则自动从打开状态转换为半打开状态,不需要调用即可触发该转换。
  • recordExceptions:是否记录请求调用失败的断言,默认所有异常都记录。
  • ignoreExceptions:是否忽视请求调用失败的断言

3.CircuitBreakerState

熔断器的各种状态

  • CLOSED —> OPEN:单向转换。当请求失败率超过阈值时,熔断器的状态由关闭状态转换到打开状态。失败率的阈值默认50%,可以通过设置CircuitBreakerConfig实例的failureRateThreshold属性值进行改变。
  • OPEN <—> HALF_OPEN:双向转换。打开状态的持续时间结束,熔断器的状态由打开状态转换到半开状态。这时允许一定数量的请求通过,当这些请求的失败率超过阈值,熔断器的状态由半开状态转换回打开状态。半开时请求的数量是由CircuitBreakerConfig实例的ringBufferSizeInHalfOpenState属性值设置的。
  • HALF_OPEN —> CLOSED:如果请求失败率小于或等于阈值,则熔断器的状态由半开状态转换到关闭状态。
    //CircuitBreaker 的状态
    private interface CircuitBreakerState {

        //判断是否允许调用后端接口, 当CircuitBreaker 状态为open的时候这里返回false 反之true
        boolean tryAcquirePermission();
        //获取请求后端接口的许可
        void acquirePermission();
        //释放
        void releasePermission();

        /**
         * 请求调用失败,记录指标
         * 当达到设定的度量指标值后,调用状态机实例触发状态转换
         * @param duration  持续时间
         * @param durationUnit
         * @param throwable
         */
        void onError(long duration, TimeUnit durationUnit, Throwable throwable);

        /**
         * 调用成功,记录指标
         * 当达到设定的度量指标后,调用状态机实例触发状态转换
         * @param duration  持续时间
         * @param durationUnit
         */
        void onSuccess(long duration, TimeUnit durationUnit);

        int attempts();

        /**
         * 返回当前状态在CircuitBreaker 接口中对应的枚举值
         * @return
         */
        CircuitBreaker.State getState();

        /**
         * 返回当前状态封装的度量指标的类实例
         * @return
         */
        CircuitBreakerMetrics getMetrics();
    }

CircuitBreakerState 的各种实现类,就那HalfOpenState 来看一下他是如何做转换的,其他都类似不一一分析了。

//CircuitBreakerStateMachine 的内部类
private class HalfOpenState implements CircuitBreakerState {
        //允许通过的数量
        private final AtomicInteger permittedNumberOfCalls;
        //是否是半开状态
        private final AtomicBoolean isHalfOpen;
 
        //是否允许通过,就是查看permittedNumberOfCalls 
        //减去这次请求是否>0是的话就是true,反之false
        @Override
        public boolean tryAcquirePermission() {
            if (permittedNumberOfCalls.getAndUpdate(current -> current == 0 ? current : --current)
                > 0) {
                return true;
            }
            circuitBreakerMetrics.onCallNotPermitted();
            return false;
        }
        //就是给permittedNumberOfCalls + 1
        @Override
        public void releasePermission() {
            permittedNumberOfCalls.incrementAndGet();
        }

        //onError 和 onSuccess 就是来进行状态转换的,调用了checkIfThresholdsExceeded 方法
        @Override
        public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
            // CircuitBreakerMetrics is thread-safe
            checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
        }
        @Override
        public void onSuccess(long duration, TimeUnit durationUnit) {
            // CircuitBreakerMetrics is thread-safe
            checkIfThresholdsExceeded(circuitBreakerMetrics.onSuccess(duration, durationUnit));
        }
    }
  • onErroronSuccess 当一个请求调用后通过返回的result来判断是触发onError 事件还是onSuccess事件,就是来进行状态转换的,调用了checkIfThresholdsExceeded 方法
       private void checkIfThresholdsExceeded(Result result) {
            //<1> 如果已经达到了阀值, 那么先把Half-Open 状态关闭,然后调用transitionToOpenState 升级到open 状态
            if (Result.hasExceededThresholds(result)) {
                if (isHalfOpen.compareAndSet(true, false)) {
                    transitionToOpenState();
                }
            }
            //<2> 如果在阀值的下面, 那么就先把Half-Open 状态关闭,然后调用transitionToClosedState 降到close 状态
            if (result == BELOW_THRESHOLDS) {
                if (isHalfOpen.compareAndSet(true, false)) {
                    transitionToClosedState();
                }
            }
        }
  • <1> 如果已经达到了阀值, 那么先把Half-Open 状态关闭,然后调用transitionToOpenState 升级到open 状态

    public void transitionToOpenState() {
            stateTransition(OPEN,
                currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));
        }
    
  • <2> 如果在阀值的下面, 那么就先把Half-Open 状态关闭,然后调用transitionToClosedState 降到close 状态

    public void transitionToClosedState() {
            stateTransition(CLOSED, currentState -> new ClosedState());
        }
    

到这里,已经把CircuitBreaker 状态如何转换,如何注册,已经讲清楚了。还差一步是,如何根据我们的方法调用来判断触发什么事件,然后进行状态的转换。


4.CircuitBreaker

在通过 CircuitBreakerRegistry 注册好熔断器之后,我们可以通过这个熔断器,来增强我们的调用方法。然后根据我们调用的方法,触发是什么事件然后判断是否需要转换状态。

public interface CircuitBreaker {
  static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<T> supplier) {
        return () -> {
            //<1> 如果熔断器是打开状态,则抛出异常,不执行下面的代码
            circuitBreaker.acquirePermission();
            //计时 start
            long start = System.nanoTime();
            try {
                //<2> 执行被装饰的方法
                T returnValue = supplier.get();
                //计时 end
                long durationInNanos = System.nanoTime() - start;
                //<3> 调用成功 在ring buffer 中记录成功状态,计算失败率 失败达到阀值,则触发状态转换
                circuitBreaker.onSuccess(durationInNanos, TimeUnit.NANOSECONDS);
                return returnValue;
            } catch (Exception exception) {
                // Do not handle java.lang.Error
                long durationInNanos = System.nanoTime() - start;
                //<4> 调用失败  在ring buffer 中记录失败状态,计算失败率 失败达到阀值,则触发状态转换
                circuitBreaker.onError(durationInNanos, TimeUnit.NANOSECONDS, exception);
                throw exception;
            }
        };
    }
}
  • <1> 如果熔断器是打开状态,则抛出异常,不执行下面的代码

    public void acquirePermission() {
                if (!tryAcquirePermission()) {
                    throw CallNotPermittedException
                        .createCallNotPermittedException(CircuitBreakerStateMachine.this);
                }
            }
    
  • <2> 执行被装饰的方法

  • <3> 调用onSuccess 在ring buffer 中记录成功状态,计算失败率 失败达到阀值,则触发状态转换,这里会通过观察者模式发布事件然后进行处理

    //io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine
    public void onSuccess(long duration, TimeUnit durationUnit) {
            //发布调用成功的事件,这里就使用了一开头讲的观察者模式
            publishSuccessEvent(duration, durationUnit);
            stateReference.get().onSuccess(duration, durationUnit);
        }
      
    //io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine
    private void publishSuccessEvent(final long duration, TimeUnit durationUnit) {
            //封装调用成功的事件
            final CircuitBreakerOnSuccessEvent event = new CircuitBreakerOnSuccessEvent(name,
                Duration.ofNanos(durationUnit.toNanos(duration)));
            //发布
            publishEventIfPossible(event);
        }
    
    • 消费事件,调用consumer 的 consumerEvent函数处理事件,返回true 已处理,false 未处理

      public <E extends T> boolean processEvent(E event) {
              boolean consumed = false;
              //查看是否有消费着, 有的话调用消费者
              if (!onEventConsumers.isEmpty()) {
                  onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
                  consumed = true;
              }
              if (!eventConsumerMap.isEmpty()) {
                  List<EventConsumer<T>> eventConsumers = this.eventConsumerMap
                      .get(event.getClass().getSimpleName());
                  if (eventConsumers != null && !eventConsumers.isEmpty()) {
                      eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
                      consumed = true;
                  }
              }
              return consumed;
          }
      
  • <4> 调用onError 在ring buffer 中记录失败状态,计算失败率 失败达到阀值,则触发状态转换


总结:

  • 通过 CircuitBreakerRegistry 注册熔断器获得熔断器并且CircuitBreaker.decorateXXX( ); 来装饰我们调用方法
  • 每次调用方法:
    • 请求调用的结果成功或失败,熔断器最终会调用度量指标CircuitBreakerMetrics的onSuccess或onError方法返回请求调用失败率到ClosedState。
    • ClosedState会在它的onSuccess或onError方法中判断请求失败率是否达到了设置的阈值,
    • 如果达到了阈值则调用状态机CircuitBreakerStateMachine的transitionToOpenState方法生成OpenState对象,同时把关闭状态的度量指标对象传给打开状态。
    • 然后熔断器把当前持有的状态更改为打开状态,完成了状态转换。