查看原文
其他

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

芋艿 芋道源码 2019-05-13

摘要: 原创出处 http://www.iocoder.cn/Hystrix/circuit-breaker/ 「芋道源码」欢迎转载,保留摘要,谢谢!

排版又崩了,请【阅读原文】。

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixCircuitBreaker

  • 3. HystrixCircuitBreaker.Factory

  • 4. HystrixCircuitBreakerImpl

    • 4.1 构造方法

    • 4.2 #subscribeToStream()

    • 4.3 #attemptExecution()

    • 4.4 #markSuccess()

    • 4.5 #markNonSuccess()

    • 4.6 #allowRequest()

    • 4.7 #isOpen()

  • 666. 彩蛋

  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。


1. 概述

本文主要分享 断路器 HystrixCircuitBreaker

HystrixCircuitBreaker 有三种状态 :

  • CLOSED :关闭

  • OPEN :打开

  • HALF_OPEN :半开

其中,断路器处于 OPEN 状态时,链路处于非健康状态,命令执行时,直接调用回退逻辑,跳过正常逻辑。

HystrixCircuitBreaker 状态变迁如下图 :


  • 红线 :初始时,断路器处于 CLOSED 状态,链路处于健康状态。当满足如下条件,断路器从 CLOSED 变成 OPEN 状态:


    • 周期( 可配, HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms )内,总请求数超过一定( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 。

    • 错误请求占总请求数超过一定比例( 可配, HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50% ) 。

  • 绿线 :断路器处于 OPEN 状态,命令执行时,若当前时间超过断路器开启时间一定时间( HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds=5000ms ),断路器变成 HALF_OPEN 状态,尝试调用正常逻辑,根据执行是否成功,打开或关闭熔断器【蓝线】。



推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. HystrixCircuitBreaker

com.netflix.hystrix.HystrixCircuitBreaker ,Hystrix 断路器接口。定义接口如下代码 :

  1. public interface HystrixCircuitBreaker {

  2.    /**

  3.     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does

  4.     * not modify any internal state, and takes into account the half-open logic which allows some requests through

  5.     * after the circuit has been opened

  6.     *

  7.     * @return boolean whether a request should be permitted

  8.     */

  9.    boolean allowRequest();

  10.    /**

  11.     * Whether the circuit is currently open (tripped).

  12.     *

  13.     * @return boolean state of circuit breaker

  14.     */

  15.    boolean isOpen();

  16.    /**

  17.     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.

  18.     */

  19.    void markSuccess();

  20.    /**

  21.     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.

  22.     */

  23.    void markNonSuccess();

  24.    /**

  25.     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal

  26.     * state.

  27.     */

  28.    boolean attemptExecution();

  29. }

  • #allowRequest() 和 #attemptExecution() 方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE=>HALF-OPEN ),而后者会。


HystrixCircuitBreaker 有两个子类实现 :

  • NoOpCircuitBreaker :的断路器实现,用于不开启断路器功能的情况。

  • HystrixCircuitBreakerImpl :完整的断路器实现。

在 AbstractCommand 创建时,初始化 HystrixCircuitBreaker ,代码如下 :

  1. /* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {

  2.    /**

  3.     * 断路器

  4.     */

  5.    protected final HystrixCircuitBreaker circuitBreaker;

  6.    protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  7.            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  8.            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  9.            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

  10.        // ... 省略无关代码

  11.        // 初始化 断路器

  12.        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);

  13.        // ... 省略无关代码

  14.    }

  15.    private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,

  16.                                                            HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,

  17.                                                            HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

  18.        if (enabled) {

  19.            if (fromConstructor == null) {

  20.                // get the default implementation of HystrixCircuitBreaker

  21.                return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);

  22.            } else {

  23.                return fromConstructor;

  24.            }

  25.        } else {

  26.            return new NoOpCircuitBreaker();

  27.        }

  28.    }

  29. }

  • 当 HystrixCommandProperties.circuitBreakerEnabled=true 时,即断路器功能开启,使用 Factory 获得 HystrixCircuitBreakerImpl 对象。在 「3. HystrixCircuitBreaker.Factory」 详细解析。

  • 当 HystrixCommandProperties.circuitBreakerEnabled=false 时,即断路器功能关闭,创建 NoOpCircuitBreaker 对象。另外,NoOpCircuitBreaker 代码简单到脑残,点击 链接 查看实现。

3. HystrixCircuitBreaker.Factory

com.netflix.hystrix.HystrixCircuitBreaker.Factory ,HystrixCircuitBreaker 工厂,主要用于:

  • 创建 HystrixCircuitBreaker 对象,目前只创建 HystrixCircuitBreakerImpl 。


  • HystrixCircuitBreaker 容器,基于 HystrixCommandKey 维护了 HystrixCircuitBreaker 单例对象 的映射。代码如下 :


  1. private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

整体代码灰常清晰,点击 链接 查看代码。

4. HystrixCircuitBreakerImpl

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl完整的断路器实现。

我们来逐个方法看看 HystrixCircuitBreakerImpl 的具体实现。

4.1 构造方法

构造方法,代码如下 :

  1. /* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {

  2.    private final HystrixCommandProperties properties;

  3.    private final HystrixCommandMetrics metrics;

  4.    enum Status {

  5.        CLOSED, OPEN, HALF_OPEN

  6.    }

  7.    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

  8.    private final AtomicLong circuitOpened = new AtomicLong(-1);

  9.    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

  10.    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {

  11.        this.properties = properties;

  12.        this.metrics = metrics;

  13.        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur

  14.        Subscription s = subscribeToStream();

  15.        activeSubscription.set(s);

  16.    }

  17. }    

  • Status 枚举类,断路器的三种状态。

  • status 属性,断路器的状态。

  • circuitOpened 属性,断路器打开,即状态变成 OPEN 的时间。

  • activeSubscription 属性,基于 Hystrix Metrics 对请求量统计 Observable 的订阅,在 「4.2 #subscribeToStream()」 详细解析。

4.2 #subscribeToStream()

#subscribeToStream() 方法,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。代码如下 :

  1. private Subscription subscribeToStream() {

  2.  1: private Subscription subscribeToStream() {

  3.  2:     /*

  4.  3:      * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream

  5.  4:      */

  6.  5:     return metrics.getHealthCountsStream()

  7.  6:             .observe()

  8.  7:             .subscribe(new Subscriber<HealthCounts>() {

  9.  8:                 @Override

  10.  9:                 public void onCompleted() {

  11. 10:

  12. 11:                 }

  13. 12:

  14. 13:                 @Override

  15. 14:                 public void onError(Throwable e) {

  16. 15:

  17. 16:                 }

  18. 17:

  19. 18:                 @Override

  20. 19:                 public void onNext(HealthCounts hc) {

  21. 20:                     System.out.println("totalRequests" + hc.getTotalRequests()); // 芋艿,用于调试

  22. 21:                     // check if we are past the statisticalWindowVolumeThreshold

  23. 22:                     if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {

  24. 23:                         // we are not past the minimum volume threshold for the stat window,

  25. 24:                         // so no change to circuit status.

  26. 25:                         // if it was CLOSED, it stays CLOSED

  27. 26:                         // if it was half-open, we need to wait for a successful command execution

  28. 27:                         // if it was open, we need to wait for sleep window to elapse

  29. 28:                     } else {

  30. 29:                         if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {

  31. 30:                             //we are not past the minimum error threshold for the stat window,

  32. 31:                             // so no change to circuit status.

  33. 32:                             // if it was CLOSED, it stays CLOSED

  34. 33:                             // if it was half-open, we need to wait for a successful command execution

  35. 34:                             // if it was open, we need to wait for sleep window to elapse

  36. 35:                         } else {

  37. 36:                             // our failure rate is too high, we need to set the state to OPEN

  38. 37:                             if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {

  39. 38:                                 circuitOpened.set(System.currentTimeMillis());

  40. 39:                             }

  41. 40:                         }

  42. 41:                     }

  43. 42:                 }

  44. 43:             });

  45. 44: }

  • 第 5 至 7 行 :向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。这里的 Observable 基于 RxJava Window 操作符。

    FROM 《ReactiveX文档中文翻译》「Window」
    定期将来自原始 Observable 的数据分解为一个 Observable 窗口,发射这些窗口,而不是每次发射一项数据


    • 简单来说,固定间隔, #onNext() 方法将不断被调用,每次计算断路器的状态。

  • 第 22 行 :判断周期( 可配, HystrixCommandProperties.default_metricsRollingStatisticalWindow=10000ms )内,总请求数超过一定( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 。




    • 这里要注意下,请求次数统计的是周期内,超过周期的不计算在内。例如说, 00:00 内发起了 N 个请求, 00:11 不计算这 N 个请求。

  • 第 29 行 :错误请求占总请求数超过一定比例( 可配, HystrixCommandProperties.circuitBreakerErrorThresholdPercentage=50% ) 。


  • 第 37 至 39 行 :满足断路器打开条件,CAS 修改状态( CLOSED=>OPEN ),并设置打开时间( circuitOpened ) 。


  • 补充】第 5 至 7 行 :😈 怕写在上面,大家有压力。Hystrix Metrics 对请求量统计 Observable 使用了两种 RxJava Window 操作符 :






    • Observable#window(timespan, unit) 方法,固定周期( 可配, HystrixCommandProperties.metricsHealthSnapshotIntervalInMilliseconds=500ms ),发射 Observable 窗口。点击 BucketedCounterStream 构造方法 查看调用处的代码。

    • Observable#window(count, skip) 方法,每发射一次skip) Observable 忽略 count ( 可配, HystrixCommandProperties.circuitBreakerRequestVolumeThreshold=20 ) 个数据项。为什么?答案在第 22 行的代码,周期内达到一定请求量是断路器打开的一个条件。点击 BucketedRollingCounterStream 构造方法 查看调用处的代码。

目前该方法有两处调用 :


  • 「4.1 构造方法」,在创建 HystrixCircuitBreakerImpl 时,向 Hystrix Metrics 对请求量统计 Observable 的发起订阅。固定间隔,计算断路器是否要关闭( CLOSE )。

  • 「4.4 #markSuccess()」,清空 Hystrix Metrics 对请求量统计 Observable 的统计信息,取消原有订阅,并发起新的订阅。

4.3 #attemptExecution()

如下是 AbstractCommand#applyHystrixSemantics(_cmd) 方法,对 HystrixCircuitBreakerImpl#attemptExecution 方法的调用的代码 :

  1. private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

  2.    // ...  省略无关代码

  3.   /* determine if we're allowed to execute */

  4.   if (circuitBreaker.attemptExecution()) {

  5.        // 执行【正常逻辑】

  6.   } else {

  7.        // 执行【回退逻辑】

  8.   }

  9. }

  • 使用 HystrixCircuitBreakerImpl#attemptExecution 方法,判断是否可以执行正常逻辑


#attemptExecution 方法,代码如下 :

  1.  1: @Override

  2.  2: public boolean attemptExecution() {

  3.  3:     // 强制 打开

  4.  4:     if (properties.circuitBreakerForceOpen().get()) {

  5.  5:         return false;

  6.  6:     }

  7.  7:     // 强制 关闭

  8.  8:     if (properties.circuitBreakerForceClosed().get()) {

  9.  9:         return true;

  10. 10:     }

  11. 11:     // 打开时间为空

  12. 12:     if (circuitOpened.get() == -1) {

  13. 13:         return true;

  14. 14:     } else {

  15. 15:         // 满足间隔尝试断路器时间

  16. 16:         if (isAfterSleepWindow()) {

  17. 17:             //only the first request after sleep window should execute

  18. 18:             //if the executing command succeeds, the status will transition to CLOSED

  19. 19:             //if the executing command fails, the status will transition to OPEN

  20. 20:             //if the executing command gets unsubscribed, the status will transition to OPEN

  21. 21:             if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {

  22. 22:                 return true;

  23. 23:             } else {

  24. 24:                 return false;

  25. 25:             }

  26. 26:         } else {

  27. 27:             return false;

  28. 28:         }

  29. 29:     }

  30. 30: }

  • 第 4 至 6 行 :当 HystrixCommandProperties.circuitBreakerForceOpen=true ( 默认值 : false) 时,即断路器强制打开,返回 false 。当该配置接入配置中心后,可以动态实现打开熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。

  • 第 8 至 10 行 :当 HystrixCommandProperties.circuitBreakerForceClose=true ( 默认值 : false) 时,即断路器强制关闭,返回 true 。当该配置接入配置中心后,可以动态实现关闭熔断。为什么会有该配置?当 HystrixCircuitBreaker 创建完成后,无法动态切换 NoOpCircuitBreaker 和 HystrixCircuitBreakerImpl ,通过该配置以实现类似效果。

  • 第 12 至 13 行 :断路器打开时间( circuitOpened ) 为"空",返回 true 。

  • 第 16 至 28 行 :调用 #isAfterSleepWindow() 方法,判断是否满足尝试调用正常逻辑的间隔时间。当满足,使用CAS 方式修改断路器状态( OPEN=>HALF_OPEN ),从而保证有且仅有一个线程能够尝试调用正常逻辑。


#isAfterSleepWindow() 方法,代码如下 :

  1. private boolean isAfterSleepWindow() {

  2.    final long circuitOpenTime = circuitOpened.get();

  3.    final long currentTime = System.currentTimeMillis();

  4.    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();

  5.    return currentTime > circuitOpenTime + sleepWindowTime;

  6. }

  • 当前时间超过断路器打开时间 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds ( 默认值, 5000ms ),返回 true 。

4.4 #markSuccess()

当尝试调用正常逻辑成功时,调用 #markSuccess() 方法,关闭断路器。代码如下 :

  1.  1: @Override

  2.  2: public void markSuccess() {

  3.  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {

  4.  4:         // 清空 Hystrix Metrics 对请求量统计 Observable 的**统计信息**

  5.  5:         //This thread wins the race to close the circuit - it resets the stream to start it over from 0

  6.  6:         metrics.resetStream();

  7.  7:         // 取消原有订阅

  8.  8:         Subscription previousSubscription = activeSubscription.get();

  9.  9:         if (previousSubscription != null) {

  10. 10:             previousSubscription.unsubscribe();

  11. 11:         }

  12. 12:         // 发起新的订阅

  13. 13:         Subscription newSubscription = subscribeToStream();

  14. 14:         activeSubscription.set(newSubscription);

  15. 15:         // 设置断路器打开时间为空

  16. 16:         circuitOpened.set(-1L);

  17. 17:     }

  18. 18: }

  • 第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN=>CLOSED )。

  • 第 6 行 :清空 Hystrix Metrics 对请求量统计 Observable 的统计信息

  • 第 8 至 14 行 :取消原有订阅,发起新的订阅。

  • 第 16 行 :设置断路器打开时间为"空" 。


如下两处调用了 #markNonSuccess() 方法 :

  • markEmits

  • markOnCompleted

4.5 #markNonSuccess()

当尝试调用正常逻辑失败时,调用 #markNonSuccess() 方法,重新打开断路器。代码如下 :

  1.  1: @Override

  2.  2: public void markNonSuccess() {

  3.  3:     if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {

  4.  4:         //This thread wins the race to re-open the circuit - it resets the start time for the sleep window

  5.  5:         circuitOpened.set(System.currentTimeMillis());

  6.  6:     }

  7.  7: }

  • 第 3 行 :使用 CAS 方式,修改断路器状态( HALF_OPEN=>OPEN )。

  • 第 5 行 :设置设置断路器打开时间为当前时间。这样, #attemptExecution() 过一段时间,可以再次尝试执行正常逻辑。


如下两处调用了 #markNonSuccess() 方法 :

  • handleFallback

  • unsubscribeCommandCleanup

4.6 #allowRequest()

#allowRequest()#attemptExecution() 方法,方法目的基本类似,差别在于当断路器满足尝试关闭条件时,前者不会将断路器不会修改状态( CLOSE=>HALF-OPEN ),而后者会。点击 链接 查看代码实现。

4.7 #isOpen()

#isOpen() 方法,比较简单,点击 链接 查看代码实现。

666. 彩蛋

呼呼,相对比较干净的一篇文章,满足。

胖友,分享一波朋友圈可好!

    

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存