查看原文
其他

注册中心 Eureka 源码解析 —— 任务批处理

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 



本文主要基于 Eureka 1.8.X 版本

  • 1. 概述

  • 2. 整体流程

  • 3. 任务处理器

  • 4. 创建任务分发器

    • 4.1 批量任务执行分发器

    • 4.2 单任务执行分发器

  • 5. 创建任务接收执行器

  • 6. 创建任务执行器

    • 6.1 创建批量任务执行器

    • 6.2 创建单任务执行器

    • 6.3 工作线程抽象类

  • 7. 网络通信整形器

  • 8. 任务接收执行器【处理任务】

  • 9. 任务接收线程【调度任务】

  • 10. 任务执行器【执行任务】

    • 10.1 批量任务工作线程

    • 10.2 单任务工作线程

  • 666. 彩蛋


1. 概述

本文主要分享 任务批处理。Eureka-Server 集群通过任务批处理同步应用实例注册实例,所以本文也是为 Eureka-Server 集群同步的分享做铺垫。

本文涉及类在 com.netflix.eureka.util.batcher 包下,涉及到主体类的类图如下( 打开大图 ):

  • 紫色部分 —— 任务分发器

  • 蓝色部分 —— 任务接收器

  • 红色部分 —— 任务执行器

  • 绿色部分 —— 任务处理器

  • 黄色部分 —— 任务持有者( 任务 )

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot

  • Java 微服务实践 - Spring Cloud

  • Java 微服务实践 - Spring Boot / Spring Cloud

2. 整体流程

任务执行的整体流程如下( 打开大图 ):

  • 细箭头 —— 任务执行经历的操作

  • 粗箭头 —— 任务队列流转的方向

  • 不同于一般情况下,任务提交了立即同步或异步执行,任务的执行拆分了三层队列

    • 蓝线:分发器在收到任务执行请求后,提交到接收队列,任务实际未执行

    • 黄线:执行器的工作线程处理任务失败,将符合条件( 见 「3. 任务处理器」 )的失败任务提交到重新执行队列。

    • 第一层,接收队列( acceptorQueue ),重新处理队列( reprocessQueue )。

  • 第二层,待执行队列( processingOrder )
    * 粉线:接收线程( Runner )将重新执行队列,接收队列提交到待执行队列。

  • 第三层,工作队列( workQueue )
    * 粉线:接收线程( Runner )将待执行队列的任务根据参数( maxBatchingSize )将任务合并成批量任务,调度( 提交 )到工作队列。
    * 黄线:执行器的工作线程,一个工作线程可以拉取一个批量任务进行执行。

  • 三层队列的好处

    • 接收队列,避免处理任务的阻塞等待。

    • 接收线程( Runner )合并任务,将相同任务编号( 是的,任务是带有编号的 )的任务合并,只执行一次。

    • Eureka-Server 为集群同步提供批量操作多个应用实例的接口,一个批量任务可以一次调度接口完成,避免多次调用的开销。当然,这样做的前提是合并任务,这也导致 Eureka-Server 集群之间对应用实例的注册和下线带来更大的延迟。毕竟,Eureka 是在 CAP 之间,选择了 AP

3. 任务处理器

com.netflix.eureka.util.batcher.TaskProcessor ,任务处理器接口。接口代码如下:

// ... 省略代码,超过微信文章上限
  • ProcessingResult ,处理任务结果。

    • `Success` ,成功。

    • `Congestion` ,拥挤错误,任务将会被重试。例如,请求被限流。

    • `TransientError` ,瞬时错误,任务将会被重试。例如,网络请求超时。

    • `PermanentError` ,永久错误,任务将会被丢弃。例如,执行时发生程序异常。

  • #process(task) 方法,处理单任务。

  • #process(tasks) 方法,处理批量任务。

4. 创建任务分发器

com.netflix.eureka.util.batcher.TaskDispatcher ,任务分发器接口。接口代码如下:

// ... 省略代码,超过微信文章上限
  • #process(…) 方法,提交任务编号,任务,任务过期时间给任务分发器处理。

com.netflix.eureka.util.batcher.TaskDispatchers ,任务分发器工厂类,用于创建任务分发器。其内部提供两种任务分发器的实现:

  • 批量任务执行的分发器,用于 Eureka-Server 集群注册信息的同步任务。

  • 单任务执行的分发器,用于 Eureka-Server 向亚马逊 AWS 的 ASG ( Autoscaling Group ) 同步状态。虽然本系列暂时对 AWS 相关的不做解析,从工具类的角度来说,本文会对该分发器进行分享。

com.netflix.eureka.cluster.ReplicationTaskProcessor ,实现 TaskDispatcher ,Eureka-Server 集群任务处理器。感兴趣的同学,可以点击链接自己研究,我们将在 《Eureka 源码解析 —— Eureka-Server 集群同步》 有详细解析。

4.1 批量任务执行分发器

调用 TaskDispatchers#createBatchingTaskDispatcher(...) 方法,创建批量任务执行的分发器,实现代码如下:

// TaskDispatchers.java
  1/**
  2:  * 创建批量任务执行的分发器
  3:  *
  4:  * @param id 任务执行器编号
  5:  * @param maxBufferSize 待执行队列最大数量
  6:  * @param workloadSize 单个批量任务包含任务最大数量
  7:  * @param workerCount 任务执行器工作线程数
  8:  * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
  9:  * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
 10:  * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
 11:  * @param taskProcessor 任务处理器
 12:  * @param <ID> 任务编号泛型
 13:  * @param <T> 任务泛型
 14:  * @return 批量任务执行的分发器
 15:  */

 // ... 省略代码,超过微信文章上限
  • 第 1 至 23 行 :方法参数。比较多哈,请耐心理解。

    • `workloadSize` 参数,单个批量任务包含任务最大数量。

    • `taskProcessor` 参数,自定义任务执行器实现

  • 第 24 至 27 行 :创建任务接收执行器。在 「5. 创建任务接收器」 详细解析。

  • 第 28 至 29 行 :创建批量任务执行器。在 「6.1 创建批量任务执行器」 详细解析。

  • 第 30 至 42 行 :创建批量任务分发器。

    • 第 32 至 35 行 :`#process()` 方法的实现,调用 `AcceptorExecutor#process(…)` 方法,提交 [ 任务编号 , 任务 , 任务过期时间 ] 给任务分发器处理。

4.2 单任务执行分发器

调用 TaskDispatchers#createNonBatchingTaskDispatcher(...) 方法,创建单任务执行的分发器,实现代码如下:

  1/**
  2:  * 创建单任务执行的分发器
  3:  *
  4:  * @param id 任务执行器编号
  5:  * @param maxBufferSize 待执行队列最大数量
  6:  * @param workerCount 任务执行器工作线程数
  7:  * @param maxBatchingDelay 批量任务等待最大延迟时长,单位:毫秒
  8:  * @param congestionRetryDelayMs 请求限流延迟重试时间,单位:毫秒
  9:  * @param networkFailureRetryMs 网络失败延迟重试时长,单位:毫秒
 10:  * @param taskProcessor 任务处理器
 11:  * @param <ID> 任务编号泛型
 12:  * @param <T> 任务泛型
 13:  * @return 单任务执行的分发器
 14:  */

 15public static <ID, T> TaskDispatcher<ID, T> createNonBatchingTaskDispatcher(String id,
 16:                                                                             int maxBufferSize,
 17:                                                                             int workerCount,
 18:                                                                             long maxBatchingDelay,
 19:                                                                             long congestionRetryDelayMs,
 20:                                                                             long networkFailureRetryMs,
 21:                                                                             TaskProcessor<T> taskProcessor) 
{
 22:     // 创建 任务接收执行器
 23:     final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
 24:             id, maxBufferSize, /* workloadSize = 1 */1, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
 25:     );
 26:     final TaskExecutors<ID, T> taskExecutor = TaskExecutors.singleItemExecutors(id, workerCount, taskProcessor, acceptorExecutor);
 27:     return new TaskDispatcher<ID, T>() {
 28:         @Override
 29:         public void process(ID id, T task, long expiryTime) {
 30:             acceptorExecutor.process(id, task, expiryTime);
 31:         }
 32
 33:         @Override
 34:         public void shutdown() {
 35:             acceptorExecutor.shutdown();
 36:             taskExecutor.shutdown();
 37:         }
 38:     };
 39: }
  • 第 1 至 21 行 :方法参数。比较多哈,请耐心理解。

    • `workloadSize` 参数,相比 `#createBatchingTaskDispatcher(…)` 少这个参数。在第 24 行,你会发现该参数传递给 AcceptorExecutor 使用 1 噢

    • `taskProcessor` 参数,自定义任务执行器实现

  • 第 21 至 25 行 :创建任务接收执行器。和 #createBatchingTaskDispatcher(…) 只差 workloadSize = 1 参数。在 「5. 创建任务接收器」 详细解析。

  • 第 28 至 29 行 :创建任务执行器。和 `#createBatchingTaskDispatcher(…)` 差别很大。「6.2 创建单任务执行器」 详细解析。

  • 第 30 至 42 行 :创建任务分发器。和 #createBatchingTaskDispatcher(…) 一样。

5. 创建任务接收执行器

com.netflix.eureka.util.batcher.AcceptorExecutor ,任务接收执行器。创建构造方法代码如下:

// ... 省略代码,超过微信文章上限
  • 第 5 至 61 行 :属性。比较多哈,请耐心理解。

    • 眼尖如你,会发现 AcceptorExecutor 即存在单任务工作队列( `singleItemWorkQueue` ),又存在批量任务工作队列( `batchWorkQueue` ) ,在 「9. 任务接收线程【调度任务】」 会解答这个疑惑。

  • 第 78 至 79 行 :创建网络通信整形器。在 「7. 网络通信整形器」 详细解析。

  • 第 81 至 85 行 :创建接收任务线程

6. 创建任务执行器

com.netflix.eureka.util.batcher.TaskExecutors ,任务执行器。其内部提供创建单任务和批量任务执行器的两种方法。TaskExecutors 构造方法如下:

// ... 省略代码,超过微信文章上限
  • workerThreads 属性,工作线程工作任务队列会被工作线程池并发拉取,并发执行

  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnableFactory ,创建工作线程工厂接口。单任务和批量任务执行器的工作线程实现不同,通过自定义工厂实现类创建。

6.1 创建批量任务执行器

调用 TaskExecutors#batchExecutors(...) 方法,创建批量任务执行器。实现代码如下:

/**
* 创建批量任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 批量任务执行器
*/

// ... 省略代码,超过微信文章上限
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.BatchWorkerRunnable ,批量任务工作线程。

6.2 创建单任务执行器

调用 TaskExecutors#singleItemExecutors(...) 方法,创建批量任务执行器。实现代码如下:

/**
* 创建单任务执行器
*
* @param name 任务执行器名
* @param workerCount 任务执行器工作线程数
* @param processor 任务处理器
* @param acceptorExecutor 接收任务执行器
* @param <ID> 任务编号泛型
* @param <T> 任务泛型
* @return 单任务执行器
*/

// ... 省略代码,超过微信文章上限
  • com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable.SingleTaskWorkerRunnable ,单任务工作线程。

6.3 工作线程抽象类

com.netflix.eureka.util.batcher.TaskExecutors.WorkerRunnable ,任务工作线程抽象类。BatchWorkerRunnable 和 SingleTaskWorkerRunnable 都实现该类,差异在 #run() 的自定义实现。WorkerRunnable 实现代码如下:

// ... 省略代码,超过微信文章上限

7. 网络通信整形器

com.netflix.eureka.util.batcher.TrafficShaper ,网络通信整形器。当任务执行发生请求限流,或是请求网络失败的情况,则延时 AcceptorRunner 将任务提交到工作任务队列,从而避免任务很快去执行,再次发生上述情况。TrafficShaper 实现代码如下:

// ... 省略代码,超过微信文章上限
  • #registerFailure(…) ,在任务执行失败时,提交任务结果给 TrafficShaper ,记录发生时间。在 「10. 任务执行器【执行任务】」 会看到调用该方法。

  • #transmissionDelay(…) ,计算提交延迟,单位:毫秒。「9. 任务接收线程【调度任务】」 会看到调用该方法。

8. 任务接收执行器【处理任务】

调用 AcceptorExecutor#process(...) 方法,添加任务到接收任务队列。实现代码如下:

// AcceptorExecutor.java
// ... 省略代码,超过微信文章上限
  • com.netflix.eureka.util.batcher.TaskHolder ,任务持有者,实现代码如下:

    // ... 省略代码,超过微信文章上限

9. 任务接收线程【调度任务】

后台线程执行 AcceptorRunner#run(...) 方法,调度任务。实现代码如下:

// ... 省略代码,超过微信文章上限
  • 第 4 行 :无限循环执行调度,直到关闭。

  • 第 6 至 7 行 :调用 #drainInputQueues() 方法,循环处理完输入队列( 接收队列 + 重新执行队列 ),直到有待执行的任务。实现代码如下:

    // ... 省略代码,超过微信文章上限
    • 第 4 行 :优先从重新执行任务的队尾拿较新的任务,从而实现保留更新的任务在待执行任务映射( pendingTasks ) 里。

    • 第 12 行 :添加任务编号到待执行队列( processingOrder ) 的头部。效果如下图:

    • 第 15 至 18 行 :如果待执行队列( pendingTasks )已满,清空重新执行队列( processingOrder ),放弃较早的任务。

    • 重新执行队列( reprocessQueue ) 和接收队列( acceptorQueue )为空

    • 待执行任务映射( pendingTasks )不为空

    • 第 2 行 && 第 18 行 :循环,直到同时满足如下全部条件:

    • 第 3 至 4 行 :处理完重新执行队列( reprocessQueue )。实现代码如下:

      // ... 省略代码,超过微信文章上限
    • 第 5 至 6 行 :处理完接收队列( acceptorQueue ),实现代码如下:

      // ... 省略代码,超过微信文章上限
    • 第 8 至 17 行 :当所有队列为空,阻塞从接收队列( acceptorQueue ) 拉取任务 10 ms。若拉取到,添加到待执行队列( processingOrder )。

  • 第 12 至 16 行 :计算可调度任务的最小时间( scheduleTime )。

    • 当 scheduleTime 小于当前时间,不重新计算,即此时需要延迟等待调度。

    • 当 scheduleTime 大于等于当前时间,配合 TrafficShaper#transmissionDelay(…) 重新计算。

  • 第 19 行 :当 scheduleTime 小于当前时间,执行任务的调度。

  • 第 21 行 :调用 #assignBatchWork() 方法,调度批量任务。实现代码如下:

    // ... 省略代码,超过微信文章上限
    • x

    • 第 2 行 :调用 #hasEnoughTasksForNextBatch() 方法,判断是否有足够任务进行下一次批量任务调度:1)待执行任务( processingOrder )映射已满;或者 2)到达批量任务处理最大等待延迟。实现代码如下:

      // ... 省略代码,超过微信文章上限
    • 第 5 至 17 行 :获取批量任务( holders )。😈 你会发现,本文说了半天的批量任务,实际是 List<taskholder,>&gt;</taskholder,>哈。

    • 第 4 行 :获取批量任务工作请求信号量( batchWorkRequests ) 。在任务执行器的批量任务执行器,每次执行时,发出 batchWorkRequests 。每一个信号量需要保证获取到一个批量任务

    • 第 19 至 20 行 :未调度到批量任务,释放请求信号量,代表请求实际未完成,每一个信号量需要保证获取到一个批量任务

    • 第 21 至 24 行 :添加批量任务到批量任务工作队列。

    • 第 23 行 :调用 #assignSingleItemWork() 方法,调度单任务。

  • 第 23 行 :调用 #assignSingleItemWork() 方法,调度单任务,和 #assignBatchWork() 方法类似。实现代码如下:

    // ... 省略代码,超过微信文章上限
    • x

  • 第 26 至 31 行 :当调度任务前的待执行任务数( totalItems )等于当前待执行队列( processingOrder )的任务数,意味着:1)任务执行器无任务请求,正在忙碌处理之前的任务;或者 2)任务延迟调度。睡眠 10 秒,避免资源浪费。

10. 任务执行器【执行任务】

10.1 批量任务工作线程

批量任务工作后台线程( BatchWorkerRunnable )执行 #run(...) 方法,调度任务。实现代码如下:

// 
// ... 省略代码,超过微信文章上限
  • 第 4 行 :无限循环执行调度,直到关闭。

  • 第 6 行 :调用 getWork() 方法,获取一个批量任务直到成功。实现代码如下:

    // ... 省略代码,超过微信文章上限
    • 注意,批量任务工作队列( batchWorkQueue ) 和单任务工作队列( singleItemWorkQueue ) 是不同的队列

    • 第 3 行 :调用 TaskDispatcher#requestWorkItems() 方法,发起请求信号量,并获得批量任务的工作队列。实现代码如下:

      // TaskDispatcher.java
      // ... 省略代码,超过微信文章上限
    • 第 5 至 8 行 :循环获取一个批量任务,直到成功。

  • 第 12 行 :调用 #getTasksOf(...) 方法,获得实际批量任务。实现代码如下:

    // ... 省略代码,超过微信文章上限
    • x

  • 第 14 至 24 行 :调用处理器( TaskProcessor ) 执行任务。当任务执行结果为 Congestion 或 TransientError ,调用 AcceptorExecutor#reprocess(...) 提交整个批量任务重新处理,实现代码如下:

    // AcceptorExecutor.java
    // ... 省略代码,超过微信文章上限

10.2 单任务工作线程

单任务工作后台线程( SingleTaskWorkerRunnable )执行 #run(...) 方法,调度任务,和 BatchWorkerRunnable#run(...) 基本类似,就不啰嗦了。实现代码如下:

@Override
// SingleTaskWorkerRunnable.java
// ... 省略代码,超过微信文章上限

666. 彩蛋

😈 又是一篇长文。建议边看代码,边对照着整体流程图,理解实际不难。

当然,欢迎你有任何疑问,在我的公众号( 芋道源码 ) 留言。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?




如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。


知识星球



目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

...
一共 60 篇++

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存