查看原文
其他

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

芋艿 芋道源码 2019-05-13

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-second-isolation-strategy/ 「芋道源码」欢迎转载,保留摘要,谢谢!

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

本文主要基于 Hystrix 1.5.X 版本

  • 1. 概述

  • 2. HystrixThreadPoolProperties

  • 3. HystrixThreadPoolKey

  • 4. HystrixConcurrencyStrategy

  • 5. HystrixThreadPool

  • 6. HystrixScheduler

  • 666. 彩蛋


1. 概述

本文主要分享 Hystrix 命令执行(二)之执行隔离策略

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

Hystrix 提供两种执行隔离策略( ExecutionIsolationStrategy ) :

  • SEMAPHORE :信号量,命令在调用线程执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「3. TryableSemaphore」 已经详细解析。

  • THREAD :线程池,命令在线程池执行。在《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「5. #executeCommandWithSpecifiedIsolation(...)」 的 #executeCommandWithSpecifiedIsolation(...) 方法中,调用 Observable#subscribeOn(Scheduler) 方法,指定在 RxJava Scheduler 执行。

    • 如果你暂时不了解 Scheduler ,可以阅读 《RxJava 源码解析 —— Scheduler》 。

    • 如果你暂时不了解 Observable#subscribeOn(Scheduler) ,可以阅读 《RxJava 源码解析 —— Observable#subscribeOn(Scheduler)》 。

两种方式的优缺点比较,推荐阅读 《【翻译】Hystrix文档-实现原理》「依赖隔离」。


推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

2. HystrixThreadPoolProperties

com.netflix.hystrix.HystrixThreadPoolProperties ,Hystrix 线程池属性配置抽象类,点击 链接 查看,已添加中文注释说明。

com.netflix.hystrix.strategy.properties.HystrixPropertiesThreadPoolDefault ,Hystrix 线程池配置实现类,点击 链接 查看。实际上没什么内容,官方如是说 :

Default implementation of {@link HystrixThreadPoolProperties} using Archaius (https://github.com/Netflix/archaius)

3. HystrixThreadPoolKey

com.netflix.hystrix.HystrixThreadPoolKey ,Hystrix 线程池标识接口

FROM HystrixThreadPoolKey 接口注释
A key to represent a {@link HystrixThreadPool} for monitoring, metrics publishing, caching and other such uses.
This interface is intended to work natively with Enums so that implementing code can be an enum that implements this interface.

  • 直白的说 ,希望通过相同的 name ( 标识 ) 获得同 HystrixThreadPoolKey 对象。通过在内部维持一个 name 与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。

HystrixThreadPoolKey 代码如下 :

  1.  1: public interface HystrixThreadPoolKey extends HystrixKey {

  2.  2:     class Factory {

  3.  3:         private Factory() {

  4.  4:         }

  5.  5:

  6.  6:         // used to intern instances so we don't keep re-creating them millions of times for the same key

  7.  7:         private static final InternMap<String, HystrixThreadPoolKey> intern

  8.  8:                 = new InternMap<String, HystrixThreadPoolKey>(

  9.  9:                 new InternMap.ValueConstructor<String, HystrixThreadPoolKey>() {

  10. 10:                     @Override

  11. 11:                     public HystrixThreadPoolKey create(String key) {

  12. 12:                         return new HystrixThreadPoolKeyDefault(key);

  13. 13:                     }

  14. 14:                 });

  15. 15:

  16. 16:         public static HystrixThreadPoolKey asKey(String name) {

  17. 17:            return intern.interned(name);

  18. 18:         }

  19. 19:

  20. 20:         private static class HystrixThreadPoolKeyDefault extends HystrixKeyDefault implements HystrixThreadPoolKey {

  21. 21:             public HystrixThreadPoolKeyDefault(String name) {

  22. 22:                 super(name);

  23. 23:             }

  24. 24:         }

  25. 25:

  26. 26:         /* package-private */ static int getThreadPoolCount() {

  27. 27:             return intern.size();

  28. 28:         }

  29. 29:     }

  30. 30: }

  • HystrixThreadPoolKey 实现 com.netflix.hystrix.HystrixKey 接口,点击 链接 查看。该接口定义的 #name()方法,即是上文我们所说的标识( Key )。

  • intern 属性, name 与 HystrixThreadPoolKey 对象的映射,以达到枚举的效果。

    • com.netflix.hystrix.util.InternMap ,点击 链接 查看带中文注释的代码。

  • #asKey(name) 方法,从 intern 获得 HystrixThreadPoolKey 对象。

  • #getThreadPoolCount() 方法,获得 HystrixThreadPoolKey 数量。


在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey 属性,代码如下 :

  1. protected final HystrixThreadPoolKey threadPoolKey;

  2. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  3.       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  4.       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  5.       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

  6.   // ... 省略无关代码

  7.   this.commandGroup = initGroupKey(group);

  8.   this.commandKey = initCommandKey(key, getClass());

  9.   this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);

  10.   // 初始化 threadPoolKey

  11.   this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

  12. }

  • 调用 #initThreadPoolKey(...) 方法,创建最终的 threadPoolKey 属性。代码如下 :

    • 优先级 : threadPoolKeyOverride > threadPoolKey > groupKey

  1. private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) {

  2.   if (threadPoolKeyOverride == null) {

  3.       // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup

  4.       if (threadPoolKey == null) {

  5.           /* use HystrixCommandGroup if HystrixThreadPoolKey is null */

  6.           return HystrixThreadPoolKey.Factory.asKey(groupKey.name());

  7.       } else {

  8.           return threadPoolKey;

  9.       }

  10.   } else { // threadPoolKeyOverride 可覆盖属性

  11.       // we have a property defining the thread-pool so use it instead

  12.       return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride);

  13.   }

  14. }

4. HystrixConcurrencyStrategy

com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy ,Hystrix 并发策略抽象类

HystrixConcurrencyStrategy#getThreadPool(...) 方法,代码如下 :

  1.  1: public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {

  2.  2:     final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);

  3.  3:

  4.  4:     final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();

  5.  5:     final int dynamicCoreSize = threadPoolProperties.coreSize().get();

  6.  6:     final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();

  7.  7:     final int maxQueueSize = threadPoolProperties.maxQueueSize().get();

  8.  8:

  9.  9:     final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

  10. 10:

  11. 11:     if (allowMaximumSizeToDivergeFromCoreSize) {

  12. 12:         final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();

  13. 13:         if (dynamicCoreSize > dynamicMaximumSize) {

  14. 14:             logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +

  15. 15:                     dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +

  16. 16:                     dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");

  17. 17:             return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  18. 18:         } else {

  19. 19:             return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  20. 20:         }

  21. 21:     } else {

  22. 22:         return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

  23. 23:     }

  24. 24: }

  • 第 2 行 :调用 #getThreadFactory(...) 方法,获得 ThreadFactory 。点击 链接 查看方法代码。

    • PlatformSpecific#getAppEngineThreadFactory() 方法,无需细看,适用于 Google App Engine 场景。

  • 第 4 至 7 行 :「2. HystrixThreadPoolProperties」 有详细解析。

  • 第 9 行 :调用 #getBlockingQueue() 方法,获得线程池的阻塞队列。点击 链接 查看方法代码。

    • 《Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析》

    • 《Java并发包中的同步队列SynchronousQueue实现原理》

    • 当 maxQueueSize<=0 时( 默认值 : -1 ) 时,使用 SynchronousQueue 。超过线程池的 maximumPoolSize时,提交任务被拒绝

    • 当 SynchronousQueue>0 时,使用 LinkedBlockingQueue 。超过线程池的 maximumPoolSize 时,任务被拒绝。超过线程池的 maximumPoolSize + 线程池队列的 maxQueueSize 时,提交任务被阻塞等待

  • 推荐 :《聊聊并发(三)——JAVA线程池的分析和使用》

  • 推荐 :《聊聊并发(七)——Java中的阻塞队列》

  • 第 11 至 23 行 :创建 ThreadPoolExecutor 。看起来代码比较多,根据 allowMaximumSizeToDivergeFromCoreSize的情况,计算线程池的 maximumPoolSize 属性。计算的方式和 HystrixThreadPoolProperties#actualMaximumSize() 方法是一致的。


com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategyDefault ,Hystrix 并发策略实现类。代码如下( 基本没做啥 ) :

  1. public class HystrixConcurrencyStrategyDefault extends HystrixConcurrencyStrategy {

  2.    /**

  3.     * 单例

  4.     */

  5.    private static HystrixConcurrencyStrategyDefault INSTANCE = new HystrixConcurrencyStrategyDefault();

  6.    public static HystrixConcurrencyStrategy getInstance() {

  7.        return INSTANCE;

  8.    }

  9.    private HystrixConcurrencyStrategyDefault() {

  10.    }

  11. }


在 AbstractCommand 构造方法里,初始化命令的 threadPoolKey 属性,代码如下 :

  1. protected final HystrixConcurrencyStrategy concurrencyStrategy;

  2. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  3.       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  4.       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  5.       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

  6.    // ... 省略无关代码

  7.    // 初始化 并发策略

  8.    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  9. }

  • HystrixPlugins ,Hystrix 插件体系,https://github.com/Netflix/Hystrix/wiki/Plugins 有详细解析。

  • 调用 HystrixPlugins#getConcurrencyStrategy() 获得 HystrixConcurrencyStrategy 对象。默认情况下,使用 HystrixConcurrencyStrategyDefault 。当然你也可以参考 Hystrix 插件体系,实现自定义的 HystrixConcurrencyStrategy 实现,以达到覆写 #getThreadPool(), #getBlockingQueue() 等方法。点击 链接 查看该方法代码。

5. HystrixThreadPool

com.netflix.hystrix.HystrixThreadPool ,Hystrix 线程池接口。当 Hystrix 命令使用 THREAD 执行隔离策略时, HystrixCommand#run() 方法在线程池执行。点击 链接 查看。HystrixThreadPool 定义接口如下 :

  • #getExecutor() :获得 ExecutorService 。

  • #getScheduler() / #getScheduler(Func0<Boolean>) :获得 RxJava Scheduler 。

  • #isQueueSpaceAvailable() :线程池队列是否有空余

  • #markThreadExecution() / #markThreadCompletion() / #markThreadRejection() :TODO 【2002】【metrics】

5.1 HystrixThreadPoolDefault

com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault ,Hystrix 线程池实现类

构造方法,代码如下 :

  1.  1: private final HystrixThreadPoolProperties properties;

  2.  2: private final BlockingQueue<Runnable> queue;

  3.  3: private final ThreadPoolExecutor threadPool;

  4.  4: private final HystrixThreadPoolMetrics metrics;

  5.  5: private final int queueSize;

  6.  6:

  7.  7: public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {

  8.  8:     // 初始化 HystrixThreadPoolProperties

  9.  9:     this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);

  10. 10:     // 获得 HystrixConcurrencyStrategy

  11. 11:     HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();

  12. 12:     // 队列大小

  13. 13:     this.queueSize = properties.maxQueueSize().get();

  14. 14:

  15. 15:     // TODO 【2002】【metrics】

  16. 16:     this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,

  17. 17:             concurrencyStrategy.getThreadPool(threadPoolKey, properties), // 初始化 ThreadPoolExecutor

  18. 18:             properties);

  19. 19:

  20. 20:     // 获得 ThreadPoolExecutor

  21. 21:     this.threadPool = this.metrics.getThreadPool();

  22. 22:     this.queue = this.threadPool.getQueue(); // 队列

  23. 23:

  24. 24:     // TODO 【2002】【metrics】

  25. 25:     /* strategy: HystrixMetricsPublisherThreadPool */

  26. 26:     HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);

  27. 27: }

  • 第 9 行 :初始化 HystrixThreadPoolProperties 。

  • 第 11 行 :初始化 HystrixConcurrencyStrategy 。

  • 第 13 行 :初始化 queueSize 。

  • 第 16 至 18 行 :TODO 【2002】【metrics】

    • 第 17 行 :调用 HystrixConcurrencyStrategy#getThreadPool(...) 方法,初始化 ThreadPoolExecutor 。

  • 第 21 行 :获得 ThreadPoolExecutor 。

  • 第 22 行 :获得 ThreadPoolExecutor 的队列。

  • 第 26 行 :TODO 【2002】【metrics】


#getExecutor() 方法,代码如下 :

  1. @Override

  2. public ThreadPoolExecutor getExecutor() {

  3.    touchConfig();

  4.    return threadPool;

  5. }

  • 调用 #touchConfig() 方法,动态调整 threadPool 的 coreSize / maximumSize / keepAliveTime 参数。点击 链接 查看该方法。


#getScheduler() / #getScheduler(Func0<Boolean>) 方法,代码如下 :

  1. @Override

  2. public Scheduler getScheduler() {

  3.    //by default, interrupt underlying threads on timeout

  4.    return getScheduler(new Func0<Boolean>() {

  5.        @Override

  6.        public Boolean call() {

  7.            return true;

  8.        }

  9.    });

  10. }

  11. @Override

  12. public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

  13.    touchConfig();

  14.    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

  15. }

  • HystrixContextScheduler 和 shouldInterruptThread 都在 「6. HystrixContextScheduler」 详细解析。


#isQueueSpaceAvailable() 方法,代码如下 :

  1. @Override

  2. public boolean isQueueSpaceAvailable() {

  3.    if (queueSize <= 0) {

  4.        // we don't have a queue so we won't look for space but instead

  5.        // let the thread-pool reject or not

  6.        return true;

  7.    } else {

  8.        return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();

  9.    }

  10. }

  • 由于线程池的队列大小不能动态调整,该方法的实现通过 HystrixThreadPoolProperties.queueSizeRejectionThreshold 属性控制。

  • 注意 queueSize 属性,决定了线程池的队列类型。

    • queueSize<=0 时, #isQueueSpaceAvailable() 都返回 true 的原因是,线程池使用 SynchronousQueue 作为队列,不支持任务排队,任务超过线程池的 maximumPoolSize 时,新任务被拒绝。

    • queueSize>0 时, #isQueueSpaceAvailable() 根据情况 truefalse 的原因是,线程池使用 LinkedBlockingQueue 作为队列,支持一定数量阻塞排队,但是这个数量无法调整。通过 #isQueueSpaceAvailable() 方法的判断,动态调整。另外,初始配置的 queueSize 要相对大,否则即使 queueSizeRejectionThreshold 配置的大于 queueSize ,实际提交任务到线程池,也会被拒绝

5.2 Factory

com.netflix.hystrix.HystrixThreadPool.Factory ,HystrixThreadPool 工厂类,不仅限于 HystrixThreadPool 的创建,也提供了 HystrixThreadPool 的管理( HystrixThreadPool 的容器 )。

threadPools 属性,维护创建的 HystrixThreadPool 对应的映射,代码如下 :

  1. final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

  • Key 为 HystrixThreadPoolKey#name() ,每个 HystrixThreadPoolKey 对应一个 HystrixThreadPool 对象。


#getInstance(...) 方法,获得 HystrixThreadPool 对象,代码如下 :

  1. /* package */static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {

  2.    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work

  3.    String key = threadPoolKey.name();

  4.    // this should find it for all but the first time

  5.    HystrixThreadPool previouslyCached = threadPools.get(key);

  6.    if (previouslyCached != null) {

  7.         return previouslyCached;

  8.     }

  9.     // if we get here this is the first time so we need to initialize

  10.     synchronized (HystrixThreadPool.class) {

  11.        if (!threadPools.containsKey(key)) {

  12.            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));

  13.       }

  14.     }

  15.     return threadPools.get(key);

  16. }

  • 根据 threadPoolKey 先从 threadPool 获取已创建的 HystrixThreadPool ;获取不到,创建对应的 HystrixThreadPool 返回,并添加到 threadPool 。


#shutdown() / #shutdown(timeout, unit) 方法,比较易懂,点击 链接 查看。

5.3 初始化

在 AbstractCommand 构造方法里,初始化命令的 threadPool 属性,代码如下 :

  1. protected final HystrixThreadPool threadPool;

  2. protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,

  3.       HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,

  4.       HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,

  5.       HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

  6.    // ... 省略其他代码

  7.    // 初始化 threadPoolKey

  8.    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());

  9.    // 初始化 threadPool

  10.    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

  11. }

  • 调用 #initThreadPool(...) 方法,获得 HystrixThreadPool ,点击 链接 查看。

6. HystrixScheduler

Hystrix 实现了自定义的 RxJava Scheduler ,整体类图如下 :

  • HystrixContextScheduler ( 实现 RxJava Scheduler 抽象类 ),内嵌类型为 ThreadPoolScheduler ( 实现 RxJava Scheduler抽象类 )的 actualScheduler 属性。

  • HystrixContextWorker ( 实现 RxJava Worker 抽象类 ),内嵌类型为 ThreadPoolWorker ( 实现 RxJava Worker 抽象类 )的 worker 属性。

6.1 HystrixContextScheduler

构造方法,代码如下 :

  1. public class HystrixContextScheduler extends Scheduler {

  2.    private final HystrixConcurrencyStrategy concurrencyStrategy;

  3.    private final Scheduler actualScheduler;

  4.    private final HystrixThreadPool threadPool;

  5.    // ... 省略无关代码

  6.    public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {

  7.        this.concurrencyStrategy = concurrencyStrategy;

  8.        this.threadPool = threadPool;

  9.        this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);

  10.    }

  11. }

  • actualScheduler 属性,类型为 ThreadPoolScheduler 。


#createWorker() 方法,代码如下 :

  1. @Override

  2. public Worker createWorker() {

  3.    return new HystrixContextSchedulerWorker(actualScheduler.createWorker());

  4. }

  • 使用 actualScheduler 创建 ThreadPoolWorker ,传参给 HystrixContextSchedulerWorker 。

6.2 HystrixContextSchedulerWorker

构造方法,代码如下 :

  1. private class HystrixContextSchedulerWorker extends Worker {

  2.    private final Worker worker;

  3.    // ... 省略无关代码

  4.    private HystrixContextSchedulerWorker(Worker actualWorker) {

  5.        this.worker = actualWorker;

  6.    }

  7. }

  • worker 属性,类型为 ThreadPoolWorker 。


#schedule(Action0) 方法,代码如下 :

  1. @Override

  2. public Subscription schedule(Action0 action) {

  3.    if (threadPool != null) {

  4.        if (!threadPool.isQueueSpaceAvailable()) {

  5.            throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");

  6.        }

  7.    }

  8.    return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));

  9. }

  • 调用 ThreadPool#isQueueSpaceAvailable() 方法,判断线程池队列是否有空余。这个就是 HystrixContextScheduler 的实际用途。


#unsubscribe() / #isUnsubscribed() 方法,使用 worker 判断,点击 链接查看。

6.3 ThreadPoolScheduler

ThreadPoolScheduler 比较简单,点击 链接 查看。

6.4 ThreadPoolWorker

构造方法,代码如下 :

  1. private static class ThreadPoolWorker extends Worker {

  2.    private final HystrixThreadPool threadPool;

  3.    private final CompositeSubscription subscription = new CompositeSubscription();

  4.    private final Func0<Boolean> shouldInterruptThread;

  5.    // ... 省略无关代码

  6.    public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {

  7.        this.threadPool = threadPool;

  8.        this.shouldInterruptThread = shouldInterruptThread;

  9.    }

  10. }

  • subscription 属性,订阅信息。


#schedule(Action0) 方法,代码如下 :

  1.  1: @Override

  2.  2: public Subscription schedule(final Action0 action) {

  3.  3:     // 未订阅,返回

  4.  4:     if (subscription.isUnsubscribed()) {

  5.  5:         // don't schedule, we are unsubscribed

  6.  6:         return Subscriptions.unsubscribed();

  7.  7:     }

  8.  8:

  9.  9:     // 创建 ScheduledAction

  10. 10:     // This is internal RxJava API but it is too useful.

  11. 11:     ScheduledAction sa = new ScheduledAction(action);

  12. 12:

  13. 13:     // 添加到 订阅

  14. 14:     subscription.add(sa);

  15. 15:     sa.addParent(subscription);

  16. 16:

  17. 17:     // 提交 任务

  18. 18:     ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();

  19. 19:     FutureTask<?> f = (FutureTask<?>) executor.submit(sa);

  20. 20:     sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));

  21. 21:

  22. 22:     return sa;

  23. 23: }

  • 第 4 至 7 行 :未订阅,返回。

  • 第 11 行 : 创建 ScheduledAction 。在 TODO 【2013】【ScheduledAction】 详细解析。

  • 第 14 至 15 行 :添加到订阅( subscription )。

  • 第 18 至 20 行 :使用 threadPool ,提交任务,并创建 FutureCompleterWithConfigurableInterrupt 添加到订阅( sa)。

  • 第 22 行 :返回订阅( sa )。整体订阅关系如下 :


#unsubscribe() / #isUnsubscribed() 方法,使用 subscription 判断,点击 链接查看。

6.5 FutureCompleterWithConfigurableInterrupt

com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.FutureCompleterWithConfigurableInterrupt ,实现类似 rx.internal.schedulers.ScheduledAction.FutureCompleter ,在它的基础上,支持配置 FutureTask#cancel(Boolean) 是否可打断运行( mayInterruptIfRunning )。

构造方法,代码如下 :

  1. private static class FutureCompleterWithConfigurableInterrupt implements Subscription {

  2.    private final FutureTask<?> f;

  3.    private final Func0<Boolean> shouldInterruptThread;

  4.    private final ThreadPoolExecutor executor;

  5.    // ... 省略无关代码

  6.    private FutureCompleterWithConfigurableInterrupt(FutureTask<?> f, Func0<Boolean> shouldInterruptThread, ThreadPoolExecutor executor) {

  7.        this.f = f;

  8.        this.shouldInterruptThread = shouldInterruptThread;

  9.        this.executor = executor;

  10.    }

  11. }


当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行
当命令执行超时,或是主动取消命令执行时,调用 #unsubscribe() 方法,取消执行

#unsubscribe() 方法,代码如下 :

  1. @Override

  2. public void unsubscribe() {

  3.    // 从 线程池 移除 任务

  4.    executor.remove(f);

  5.    // 根据 shouldInterruptThread 配置,是否强制取消

  6.    if (shouldInterruptThread.call()) {

  7.        f.cancel(true);

  8.    } else {

  9.        f.cancel(false);

  10.    }

  11. }

  • 根据 shouldInterruptThread 方法,判断是否强制取消。

  • shouldInterruptThread 对应的方法,实现代码如下 :

  1. subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

  2.    @Override

  3.    public Boolean call() {

  4.        return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

  5.    }

  6. }));

  • 当 executionIsolationThreadInterruptOnTimeout=true 时,命令可执行超时。当命令可执行超时时,强制取消。

  • 当使用 HystrixCommand.queue() 返回的 Future ,可以使用 Future#cancel(Boolean) 取消命令执行。从 shouldInterruptThread 对应的方法可以看到,如果此时不满足命令执行超时的条件,命令执行取消的方式是非强制的。此时当 executionIsolationThreadInterruptOnFutureCancel=true 时,并且调用 Future#cancel(Boolean) 传递 mayInterruptIfRunning=true ,强制取消命令执行。

    • 模拟测试用例 : CommandHelloWorld#testAsynchronous3()

    • HystrixCommand#queue() :点击 链接 查看 Future#cancel(Boolean) 方法。

666. 彩蛋

一边写一边想明白了 RxJava 的一些东西,挺舒服的赶脚。

继续 Go On ~ 周末嗨不停。

胖友,分享一波朋友圈可好!


    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存