查看原文
其他

熔断器 Hystrix 源码解析 —— 命令执行(三)之执行超时

芋艿 芋道源码 2019-05-13

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-timeout/ 「芋道源码」欢迎转载,保留摘要,谢谢!

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixObservableTimeoutOperator

  • 3. HystrixTimer

    • 3.1 ScheduledExecutor

    • 3.2 TimerListener

    • 3.3 TimerReference

  • 666. 彩蛋


1. 概述

本文主要分享 Hystrix 命令执行(三)之执行超时

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

开启执行超时功能,需要配置 :

  • HystrixCommandProperties.executionTimeoutEnabled :执行命令超时功能开关。

    • 值 :Boolean

    • 默认值 : true

  • HystrixCommandProperties.executionTimeoutInMilliseconds :执行命令超时时长。

    • 值 :Integer

    • 单位 :毫秒

    • 默认值 :1000 毫秒

在 《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中, #executeCommandAndObserve(...) 方法的第 75 行 lift(newHystrixObservableTimeoutOperator<R>(_cmd)) ,实现了对执行命令超时的监控。

  • 对 Observable#lift(Operator) 方法不熟悉的同学,在 《RxJava 源码解析 —— Observable#lift(Operator)》 有详细解析。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. HystrixObservableTimeoutOperator

HystrixObservableTimeoutOperator 类,代码如下 :

  1.  1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

  2.  2:

  3.  3:     final AbstractCommand<R> originalCommand;

  4.  4:

  5.  5:     public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {

  6.  6:         this.originalCommand = originalCommand;

  7.  7:     }

  8.  8:

  9.  9:     @Override

  10. 10:     public Subscriber<? super R> call(final Subscriber<? super R> child) {

  11. 11:         // 创建 订阅

  12. 12:         final CompositeSubscription s = new CompositeSubscription();

  13. 13:         // 添加 订阅

  14. 14:         // if the child unsubscribes we unsubscribe our parent as well

  15. 15:         child.add(s);

  16. 16:

  17. 17:         //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later

  18. 18:         final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();

  19. 19:

  20. 20:         TimerListener listener = new TimerListener() {

  21. 21:

  22. 22:             @Override

  23. 23:             public void tick() {

  24. 24:                 // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath

  25. 25:                 // otherwise it means we lost a race and the run() execution completed or did not start

  26. 26:                 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {

  27. 27:                     // report timeout failure

  28. 28:                     originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

  29. 29:

  30. 30:                     // shut down the original request

  31. 31:                     s.unsubscribe();

  32. 32:

  33. 33:                     final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {

  34. 34:

  35. 35:                         @Override

  36. 36:                         public void run() {

  37. 37:                             child.onError(new HystrixTimeoutException());

  38. 38:                         }

  39. 39:                     });

  40. 40:

  41. 41:                     timeoutRunnable.run();

  42. 42:                     //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout

  43. 43:                 }

  44. 44:             }

  45. 45:

  46. 46:             @Override

  47. 47:             public int getIntervalTimeInMilliseconds() {

  48. 48:                 return originalCommand.properties.executionTimeoutInMilliseconds().get();

  49. 49:             }

  50. 50:         };

  51. 51:

  52. 52:         final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

  53. 53:

  54. 54:         // set externally so execute/queue can see this

  55. 55:         originalCommand.timeoutTimer.set(tl);

  56. 56:

  57. 57:         /**

  58. 58:          * If this subscriber receives values it means the parent succeeded/completed

  59. 59:          */

  60. 60:         Subscriber<R> parent = new Subscriber<R>() {

  61. 61:

  62. 62:             @Override

  63. 63:             public void onCompleted() {

  64. 64:                 if (isNotTimedOut()) {

  65. 65:                     // stop timer and pass notification through

  66. 66:                     tl.clear();

  67. 67:                     // 完成

  68. 68:                     child.onCompleted();

  69. 69:                 } else {

  70. 70:                     System.out.println("timeout: " + "onCompleted"); // 笔者调试用

  71. 71:                 }

  72. 72:             }

  73. 73:

  74. 74:             @Override

  75. 75:             public void onError(Throwable e) {

  76. 76:                 if (isNotTimedOut()) {

  77. 77:                     // stop timer and pass notification through

  78. 78:                     tl.clear();

  79. 79:                     // 异常

  80. 80:                     child.onError(e);

  81. 81:                 } else {

  82. 82:                     System.out.println("timeout: " + "onError"); // 笔者调试用

  83. 83:                 }

  84. 84:             }

  85. 85:

  86. 86:             @Override

  87. 87:             public void onNext(R v) {

  88. 88:                 if (isNotTimedOut()) {

  89. 89:                     // 继续执行

  90. 90:                     child.onNext(v);

  91. 91:                 } else {

  92. 92:                     System.out.println("timeout: " + "onNext"); // 笔者调试用

  93. 93:                 }

  94. 94:             }

  95. 95:

  96. 96:             /**

  97. 97:              * 通过 CAS 判断是否超时

  98. 98:              *

  99. 99:              * @return 是否超时

  100. 100:              */

  101. 101:             private boolean isNotTimedOut() {

  102. 102:                 // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED

  103. 103:                 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||

  104. 104:                         originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);

  105. 105:             }

  106. 106:

  107. 107:         };

  108. 108:

  109. 109:         // 添加 订阅

  110. 110:         // if s is unsubscribed we want to unsubscribe the parent

  111. 111:         s.add(parent);

  112. 112:

  113. 113:         return parent;

  114. 114:     }

  115. 115:

  116. 116: }

  • 第 12 行 :创建订阅 s


  • 第 15 行 :添加订阅 schild 的订阅。


  • 第 18 行 :获得 HystrixRequestContext 。因为下面 listener 的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。


  • 第 20 至 50 行 :创建执行命令超时监听器 listener ( TimerListener ) 。当超过执行命令的时长( TimerListener#getIntervalTimeInMilliseconds() )时, TimerListener#tick() 方法触发调用。





    • HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到 Callable#run() 所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。

    • ExecutionIsolationStrategy.THREAD :该策略下提供取消订阅( #unsubscribe() ),并且命令执行超时,强制取消命令的执行。在 《Hystrix 源码解析 —— 命令执行(二)之执行隔离策略》「6.5 FutureCompleterWithConfigurableInterrupt」 有详细解析。

    • ExecutionIsolationStrategy.SEMAPHORE :该策略下提供取消订阅( #unsubscribe() )时,对超时执行命令的取消。所以,在选择执行隔离策略,要注意这块

    • 第 26 行 :通过 AbstractCommand.isCommandTimedOut 变量 CAS 操作,保证和下面第 60 行的 parent 有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :

    • 第 28 行 :TODO 【2011】【Hystrix 事件机制】

    • 第 31 行 :取消订阅 s 。注意 :不同执行隔离策略此处的表现不同




    • 第 34 至 41 行 :执行 child#onError(e) 【Subscriber#onError(Throwable)】 方法,处理 HystrixTimeoutException 异常。该异常会被 handleFallback 处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。




  • 第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。


  • 第 55 行 :设置 TimerListener 到 AbstractCommand.timeoutTimer 属性。用于执行超时等等场景下的 TimerListener 的清理( tl#clear() )。如下方法有通过该属性对 TimerListener 的清理 :




    • AbstractCommand#handleCommandEnd()

    • AbstractCommand#cleanUpAfterResponseFromCache()

  • 第 60 至 107 行 :创建的 Subscriber ( parent )。在传参的 child 的基础上,增加了对是否执行超时的判断( #isNotTimedOut() )和TimerListener的清理。


  • 第 111 行 :添加添加订阅 parents 的订阅。整体订阅关系如下 :





    • 这里看起来 s 有些“多余” ?因为 parent 和 listener 存在互相引用的情况,通过 s 解决。

  • 第 113 行 :返回 parent注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》 。


3. HystrixTimer

com.netflix.hystrix.util.HystrixTimer ,Hystrix 定时器。

目前有如下场景使用 :

  • 执行命令超时任务,本文详细解析。

  • 命令批量执行,在 《Hystrix 源码解析 —— 命令合并执行》「5. CollapsedTask」 详细解析。

HystrixTimer 构造方法,代码如下 :

  1. public class HystrixTimer {

  2.    /**

  3.     * 单例

  4.     */

  5.    private static HystrixTimer INSTANCE = new HystrixTimer();

  6.    /* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();

  7.    private HystrixTimer() {

  8.        // private to prevent public instantiation

  9.    }

  10.    public static HystrixTimer getInstance() {

  11.        return INSTANCE;

  12.    }

  13. }

  • INSTANCE 静态属性,单例。

  • executor 属性,定时任务执行器( ScheduledExecutor )。


调用 HystrixTimer#addTimerListener(TimerListener) 方法,提交定时监听器,生成定时任务,代码如下 :

  1.  1: public Reference<TimerListener> addTimerListener(final TimerListener listener) {

  2.  2:     startThreadIfNeeded();

  3.  3:     // add the listener

  4.  4:

  5.  5:     Runnable r = new Runnable() {

  6.  6:

  7.  7:         @Override

  8.  8:         public void run() {

  9.  9:             try {

  10. 10:                 listener.tick();

  11. 11:             } catch (Exception e) {

  12. 12:                 logger.error("Failed while ticking TimerListener", e);

  13. 13:             }

  14. 14:         }

  15. 15:     };

  16. 16:

  17. 17:     ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);

  18. 18:     return new TimerReference(listener, f);

  19. 19: }

  • 第 2 行 :调用 #startThreadIfNeeded() 方法,保证 executor 延迟初始化已完成。

    • #startThreadIfNeeded() 方法 ,比较简单,点击 链接 查看。

    • ScheduledExecutor 在 「3.1 ScheduledExecutor」 详细解析。

  • 第 5 至 15 行 :创建定时任务 Runnable 。在 Runnable#run() 方法里,调用 TimerListener#tick() 方法。在「3.2 TimerListener」 详细解析。

  • 第 17 行 :提交定时监听器,生成定时任务 f ( ScheduledFuture )。

  • 第 18 行 :使用 listener + f 创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。

3.1 ScheduledExecutor

com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor ,Hystrix 定时任务执行器。代码如下 :

  1. /* package */ static class ScheduledExecutor {

  2.    /**

  3.    * 定时任务线程池执行器

  4.    */

  5.    /* package */ volatile ScheduledThreadPoolExecutor executor;

  6.    /**

  7.     * 是否初始化

  8.     */

  9.    private volatile boolean initialized;

  10.    /**

  11.     * We want this only done once when created in compareAndSet so use an initialize method

  12.     */

  13.    public void initialize() {

  14.        // coreSize

  15.        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

  16.        int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();

  17.        // 创建 ThreadFactory

  18.        ThreadFactory threadFactory = null;

  19.        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {

  20.            threadFactory = new ThreadFactory() {

  21.                final AtomicInteger counter = new AtomicInteger();

  22.                @Override

  23.                public Thread newThread(Runnable r) {

  24.                    Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());

  25.                    thread.setDaemon(true);

  26.                    return thread;

  27.                }

  28.            };

  29.        } else {

  30.            threadFactory = PlatformSpecific.getAppEngineThreadFactory();

  31.        }

  32.        // 创建 ScheduledThreadPoolExecutor

  33.        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);

  34.        // 已初始化

  35.        initialized = true;

  36.   }

  37.   public ScheduledThreadPoolExecutor getThreadPool() {

  38.       return executor;

  39.   }

  40.   public boolean isInitialized() {

  41.       return initialized;

  42.   }

  43. }

  • 线程池大小( coreSize ),通过 HystrixTimerThreadPoolProperties.corePoolSize 配置。

3.2 TimerListener

com.netflix.hystrix.util.HystrixTimer.TimerListener ,Hystrix 定时任务监听器*接口*。代码如下 :

  1. public static interface TimerListener {

  2.   /**

  3.    * The 'tick' is called each time the interval occurs.

  4.    * <p>

  5.    * This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning.

  6.    * <p>

  7.    * This contract is used to keep this implementation single-threaded and simplistic.

  8.    * <p>

  9.    * If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value.

  10.    */

  11.   void tick();

  12.   /**

  13.    * How often this TimerListener should 'tick' defined in milliseconds.

  14.    */

  15.   int getIntervalTimeInMilliseconds();

  16. }

  • #tick() 方法 :时间到达( 超时 )执行的逻辑。

  • #getIntervalTimeInMilliseconds() 方法 :返回到达( 超时 )时间时长。

3.3 TimerReference

com.netflix.hystrix.util.HystrixTimer.TimerReference ,Hystrix 定时任务引用。代码如下 :

  1. private static class TimerReference extends SoftReference<TimerListener> {

  2.    private final ScheduledFuture<?> f;

  3.    TimerReference(TimerListener referent, ScheduledFuture<?> f) {

  4.        super(referent);

  5.        this.f = f;

  6.    }

  7.    @Override

  8.    public void clear() {

  9.        super.clear();

  10.        // stop this ScheduledFuture from any further executions

  11.        f.cancel(false); // 非强制

  12.    }

  13. }

  • 通过 #clear() 方法,可以取消定时任务的执行。

666. 彩蛋

顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。

喵了个咪~

胖友,分享一波朋友圈可好!


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存