简述
在Resilience4j 中所有模块的事件机制采用了观察者模式
,所以先写一个观察者模式熟悉一下以便在之后的源码分析。
//抽象的主体角色
interface AbstractSubject{
void registerObserver();
void notifyObserver();
}
//具体的主体角色
class ConcreteSubject implements AbstractSubject{
List<AbstractObserver> observers = new ArrayList<>();
public void registerObserver(Observer o) {
this.observers.add(o);
}
public void notifyObservers(String tweet) {
observers.forEach(o -> o.notify(tweet));
}
}
//抽象的观察者角色
interface AbstractObserver{
void notify(String tweet);
}
//具体的观察的角色
class ConcreteNYTimes implements Observer {
@Override
public void notify(String tweet) {
if (tweet != null && tweet.contains("money")) {
System.out.println("Breaking news in NY! " + tweet);
}
}
}
//具体的观察的角色
class ConcreteGuardian implements Observer {
@Override
public void notify(String tweet) {
if (tweet != null && tweet.contains("queen")) {
System.out.println("Yet another news in London... " + tweet);
}
}
}
//测试
public static void main(String[] args){
ConcreteSubject cs = new ConcreteSubject();
cs.registerObserver(new ConcreteNYTimes());
cs.registerObserver(new ConcreteGuardian());
cs.notifyObservers("Hi");
}
Resilience4j的CircuitBreaker主要由6个部分组成:
CircuitBreakerRegistry
:管理熔断器实例的注册容器CircuitBreakerConfig
:熔断器的相关配置CircuitBreakerState
:熔断器的各种状态CircuitBreakerMetrics
:触发熔断器状态变化的指标CircuitBreakerEvent
:熔断器行为变化产生的事件CircuitBreaker
:熔断器本身
整体的调用流程是:
- 通过
CircuitBreakerRegistry
注册一个熔断器。 - 然后得到
CircuitBreaker
熔断器装饰你的调用方法。 - 跟你方法的执行判断是否是慢请求或者请求失败,然后发布事件。
- 统计这些事件然后根据熔断器的指标来判断熔断器是否要关闭、半打开、打开。
源码分析
1.CircuitBreakerRegistry
管理熔断器实例的注册容器
public interface CircuitBreakerRegistry extends Registry<CircuitBreaker, CircuitBreakerConfig> {
/**
* <1> 根据自定义的配置,创建CircuitBreakerRegistry实例 线程安全的单例
*/
static CircuitBreakerRegistry of(CircuitBreakerConfig circuitBreakerConfig) {
return new InMemoryCircuitBreakerRegistry(circuitBreakerConfig);
}
//...省略 都差不多 就是创建创建CircuitBreakerRegistry实例
//默认配置
static CircuitBreakerRegistry ofDefaults() {
return new InMemoryCircuitBreakerRegistry();
//获取所有的熔断器
Seq<CircuitBreaker> getAllCircuitBreakers();
}
1.1 InMemoryCircuitBreakerRegistry
此类是真正管理熔断器的。根据上面<1>
处创建了InMemoryCircuitBreakerRegistry 对象。
//io.github.resilience4j.circuitbreaker.internal.InMemoryCircuitBreakerRegistry
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
super(defaultConfig);
}
//io.github.resilience4j.core.registry.AbstractRegistry
public AbstractRegistry(C defaultConfig) {
this(defaultConfig, HashMap.empty());
}
public AbstractRegistry(C defaultConfig, List<RegistryEventConsumer<E>> registryEventConsumers,
Map<String, String> tags) {
this.configurations = new ConcurrentHashMap<>();
//可以看到是用ConcurrentHashMap 来作为管理熔断器的容器,保证线程安全
this.entryMap = new ConcurrentHashMap<>();
this.eventProcessor = new RegistryEventProcessor(
Objects.requireNonNull(registryEventConsumers, CONSUMER_MUST_NOT_BE_NULL));
this.registryTags = Objects.requireNonNull(tags, TAGS_MUST_NOT_BE_NULL);
this.configurations
.put(DEFAULT_CONFIG, Objects.requireNonNull(defaultConfig, CONFIG_MUST_NOT_BE_NULL));
}
- 在上面可以得知,CircuitBreakerRegistry 只是一个入口,真正管理熔断器的是InMemoryCircuitBreakerRegistry
- 而InMemoryCircuitBreakerRegistry 又是通过 ConcurrentHashMap 来作为熔断器的容器
2.CircuitBreakerConfig
熔断器的相关配置
public class CircuitBreakerConfig {
// 请求调用失败的阈值,百分比。默认是50%
public static final int DEFAULT_FAILURE_RATE_THRESHOLD = 50; // Percentage
// 请求调用慢请求的阀值, 百分比。 默认50%
public static final int DEFAULT_SLOW_CALL_RATE_THRESHOLD = 100; // Percentage
// 在open 状态等待的时间, 时间到了 就进去 half-open状态 默认60s
public static final int DEFAULT_WAIT_DURATION_IN_OPEN_STATE = 60; // Seconds
// 在half-open 状态下 允许通过请求数量 默认10个
public static final int DEFAULT_PERMITTED_CALLS_IN_HALF_OPEN_STATE = 10;
// 最少需要多少个请求调用 进行计算错误率 默认100
public static final int DEFAULT_MINIMUM_NUMBER_OF_CALLS = 100;
// 在滑动窗口的模式下, 需要多少个请求调用进行计算, 默认100
public static final int DEFAULT_SLIDING_WINDOW_SIZE = 100;
public static final int DEFAULT_SLOW_CALL_DURATION_THRESHOLD = 60; // Seconds
// 统计的模式。 默认 计数, 还有一个是 滑动窗口
public static final SlidingWindowType DEFAULT_SLIDING_WINDOW_TYPE = SlidingWindowType.COUNT_BASED;
public static final boolean DEFAULT_WRITABLE_STACK_TRACE_ENABLED = true;
// 是否记录请求调用失败的断言,默认所有异常都记录。
private static final Predicate<Throwable> DEFAULT_RECORD_EXCEPTION_PREDICATE = throwable -> true;
// 是否忽视请求调用失败的断言, 默认false
private static final Predicate<Throwable> DEFAULT_IGNORE_EXCEPTION_PREDICATE = throwable -> false;
// The default exception predicate counts all exceptions as failures.
private Predicate<Throwable> recordExceptionPredicate = DEFAULT_RECORD_EXCEPTION_PREDICATE;
// The default exception predicate ignores no exceptions.
private Predicate<Throwable> ignoreExceptionPredicate = DEFAULT_IGNORE_EXCEPTION_PREDICATE;
@SuppressWarnings("unchecked")
// 请求调用失败,存储异常记录的集合
private Class<? extends Throwable>[] recordExceptions = new Class[0];
@SuppressWarnings("unchecked")
// 请求调用失败,忽略异常记录的集合
private Class<? extends Throwable>[] ignoreExceptions = new Class[0];
}
failureRateThreshold
:请求调用失败的阈值百分比。默认是50%slowCallRateThreshold:
:请求调用慢请求的阀值百分比。 默认100%slowCallDurationThreshold
: 调用请求超过此只就被视为是慢请求,默认60秒permittedNumberOfCallsInHalfOpenState
:在半开状态下允许通过的请求数,默认10个slidingWindowType
:统计的类型,默认count-based,可选 time-basedslidingWindowSize
:滑动窗口的大小,默认100minimumNumberOfCalls
:统计最小次数,默认10次 如果只有9次,那么不会触发统计waitDurationInOpenState
:在打开状态下,等待多久进去半打开状态,默认 60秒automaticTransitionFromOpenToHalfOpenEnabled
:默认为false,如果设置为true,则自动从打开状态转换为半打开状态,不需要调用即可触发该转换。recordExceptions
:是否记录请求调用失败的断言,默认所有异常都记录。ignoreExceptions
:是否忽视请求调用失败的断言
3.CircuitBreakerState
熔断器的各种状态
CLOSED —> OPEN
:单向转换。当请求失败率超过阈值时,熔断器的状态由关闭状态转换到打开状态。失败率的阈值默认50%,可以通过设置CircuitBreakerConfig实例的failureRateThreshold属性值进行改变。OPEN <—> HALF_OPEN
:双向转换。打开状态的持续时间结束,熔断器的状态由打开状态转换到半开状态。这时允许一定数量的请求通过,当这些请求的失败率超过阈值,熔断器的状态由半开状态转换回打开状态。半开时请求的数量是由CircuitBreakerConfig实例的ringBufferSizeInHalfOpenState属性值设置的。HALF_OPEN —> CLOSED
:如果请求失败率小于或等于阈值,则熔断器的状态由半开状态转换到关闭状态。
//CircuitBreaker 的状态
private interface CircuitBreakerState {
//判断是否允许调用后端接口, 当CircuitBreaker 状态为open的时候这里返回false 反之true
boolean tryAcquirePermission();
//获取请求后端接口的许可
void acquirePermission();
//释放
void releasePermission();
/**
* 请求调用失败,记录指标
* 当达到设定的度量指标值后,调用状态机实例触发状态转换
* @param duration 持续时间
* @param durationUnit
* @param throwable
*/
void onError(long duration, TimeUnit durationUnit, Throwable throwable);
/**
* 调用成功,记录指标
* 当达到设定的度量指标后,调用状态机实例触发状态转换
* @param duration 持续时间
* @param durationUnit
*/
void onSuccess(long duration, TimeUnit durationUnit);
int attempts();
/**
* 返回当前状态在CircuitBreaker 接口中对应的枚举值
* @return
*/
CircuitBreaker.State getState();
/**
* 返回当前状态封装的度量指标的类实例
* @return
*/
CircuitBreakerMetrics getMetrics();
}
CircuitBreakerState 的各种实现类,就那HalfOpenState 来看一下他是如何做转换的,其他都类似不一一分析了。
//CircuitBreakerStateMachine 的内部类
private class HalfOpenState implements CircuitBreakerState {
//允许通过的数量
private final AtomicInteger permittedNumberOfCalls;
//是否是半开状态
private final AtomicBoolean isHalfOpen;
//是否允许通过,就是查看permittedNumberOfCalls
//减去这次请求是否>0是的话就是true,反之false
@Override
public boolean tryAcquirePermission() {
if (permittedNumberOfCalls.getAndUpdate(current -> current == 0 ? current : --current)
> 0) {
return true;
}
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
//就是给permittedNumberOfCalls + 1
@Override
public void releasePermission() {
permittedNumberOfCalls.incrementAndGet();
}
//onError 和 onSuccess 就是来进行状态转换的,调用了checkIfThresholdsExceeded 方法
@Override
public void onError(long duration, TimeUnit durationUnit, Throwable throwable) {
// CircuitBreakerMetrics is thread-safe
checkIfThresholdsExceeded(circuitBreakerMetrics.onError(duration, durationUnit));
}
@Override
public void onSuccess(long duration, TimeUnit durationUnit) {
// CircuitBreakerMetrics is thread-safe
checkIfThresholdsExceeded(circuitBreakerMetrics.onSuccess(duration, durationUnit));
}
}
onError
和onSuccess
当一个请求调用后通过返回的result来判断是触发onError 事件还是onSuccess事件,就是来进行状态转换的,调用了checkIfThresholdsExceeded
方法
private void checkIfThresholdsExceeded(Result result) {
//<1> 如果已经达到了阀值, 那么先把Half-Open 状态关闭,然后调用transitionToOpenState 升级到open 状态
if (Result.hasExceededThresholds(result)) {
if (isHalfOpen.compareAndSet(true, false)) {
transitionToOpenState();
}
}
//<2> 如果在阀值的下面, 那么就先把Half-Open 状态关闭,然后调用transitionToClosedState 降到close 状态
if (result == BELOW_THRESHOLDS) {
if (isHalfOpen.compareAndSet(true, false)) {
transitionToClosedState();
}
}
}
-
<1>
如果已经达到了阀值, 那么先把Half-Open 状态关闭,然后调用transitionToOpenState 升级到open 状态public void transitionToOpenState() { stateTransition(OPEN, currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics())); }
-
<2>
如果在阀值的下面, 那么就先把Half-Open 状态关闭,然后调用transitionToClosedState 降到close 状态public void transitionToClosedState() { stateTransition(CLOSED, currentState -> new ClosedState()); }
到这里,已经把CircuitBreaker 状态如何转换,如何注册,已经讲清楚了。还差一步是,如何根据我们的方法调用来判断触发什么事件,然后进行状态的转换。
4.CircuitBreaker
在通过 CircuitBreakerRegistry 注册好熔断器之后,我们可以通过这个熔断器,来增强我们的调用方法。然后根据我们调用的方法,触发是什么事件然后判断是否需要转换状态。
public interface CircuitBreaker {
static <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<T> supplier) {
return () -> {
//<1> 如果熔断器是打开状态,则抛出异常,不执行下面的代码
circuitBreaker.acquirePermission();
//计时 start
long start = System.nanoTime();
try {
//<2> 执行被装饰的方法
T returnValue = supplier.get();
//计时 end
long durationInNanos = System.nanoTime() - start;
//<3> 调用成功 在ring buffer 中记录成功状态,计算失败率 失败达到阀值,则触发状态转换
circuitBreaker.onSuccess(durationInNanos, TimeUnit.NANOSECONDS);
return returnValue;
} catch (Exception exception) {
// Do not handle java.lang.Error
long durationInNanos = System.nanoTime() - start;
//<4> 调用失败 在ring buffer 中记录失败状态,计算失败率 失败达到阀值,则触发状态转换
circuitBreaker.onError(durationInNanos, TimeUnit.NANOSECONDS, exception);
throw exception;
}
};
}
}
-
<1>
如果熔断器是打开状态,则抛出异常,不执行下面的代码public void acquirePermission() { if (!tryAcquirePermission()) { throw CallNotPermittedException .createCallNotPermittedException(CircuitBreakerStateMachine.this); } }
-
<2>
执行被装饰的方法 -
<3>
调用onSuccess
在ring buffer 中记录成功状态,计算失败率 失败达到阀值,则触发状态转换,这里会通过观察者模式发布事件然后进行处理//io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine public void onSuccess(long duration, TimeUnit durationUnit) { //发布调用成功的事件,这里就使用了一开头讲的观察者模式 publishSuccessEvent(duration, durationUnit); stateReference.get().onSuccess(duration, durationUnit); } //io.github.resilience4j.circuitbreaker.internal.CircuitBreakerStateMachine private void publishSuccessEvent(final long duration, TimeUnit durationUnit) { //封装调用成功的事件 final CircuitBreakerOnSuccessEvent event = new CircuitBreakerOnSuccessEvent(name, Duration.ofNanos(durationUnit.toNanos(duration))); //发布 publishEventIfPossible(event); }
-
消费事件,调用consumer 的 consumerEvent函数处理事件,返回true 已处理,false 未处理
public <E extends T> boolean processEvent(E event) { boolean consumed = false; //查看是否有消费着, 有的话调用消费者 if (!onEventConsumers.isEmpty()) { onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event)); consumed = true; } if (!eventConsumerMap.isEmpty()) { List<EventConsumer<T>> eventConsumers = this.eventConsumerMap .get(event.getClass().getSimpleName()); if (eventConsumers != null && !eventConsumers.isEmpty()) { eventConsumers.forEach(consumer -> consumer.consumeEvent(event)); consumed = true; } } return consumed; }
-
-
<4>
调用onError
在ring buffer 中记录失败状态,计算失败率 失败达到阀值,则触发状态转换
总结:
- 通过 CircuitBreakerRegistry 注册熔断器获得熔断器并且CircuitBreaker.decorateXXX( ); 来装饰我们调用方法
- 每次调用方法:
- 请求调用的结果成功或失败,熔断器最终会调用度量指标CircuitBreakerMetrics的onSuccess或onError方法返回请求调用失败率到ClosedState。
- ClosedState会在它的onSuccess或onError方法中判断请求失败率是否达到了设置的阈值,
- 如果达到了阈值则调用状态机CircuitBreakerStateMachine的transitionToOpenState方法生成OpenState对象,同时把关闭状态的度量指标对象传给打开状态。
- 然后熔断器把当前持有的状态更改为打开状态,完成了状态转换。