查看原文
其他

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业监听器

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 



摘要: 原创出处 http://www.iocoder.cn/Elastic-Job/job-listener/ 「芋道源码」欢迎转载,保留摘要,谢谢!

1. 概述2. ElasticJobListener3. AbstractDistributeOnceElasticJobListener


1. 概述

本文主要分享 Elastic-Job-Lite 作业监听器

涉及到主要类的类图如下( 打开大图 ):

  • 绿色监听器接口 ElasticJobListener,每台作业节点均执行。

  • 粉色监听器接口 AbstractDistributeOnceElasticJobListener,分布式场景中仅单一节点执行。

  • 蓝色类在 com.dangdang.ddframe.job.lite.internal.guarantee 里,保证分布式任务全部开始和结束状态。 AbstractDistributeOnceElasticJobListener 通过 guarantee 功能,实现分布式场景中仅单一节点执行。

你行好事会因为得到赞赏而愉悦 
同理,开源项目贡献者会因为 Star 而更加有动力 
为 Elastic-Job 点赞!传送门

2. ElasticJobListener

ElasticJobListener,作业监听器接口,每台作业节点均执行

若作业处理作业服务器的文件,处理完成后删除文件,可考虑使用每个节点均执行清理任务。此类型任务实现简单,且无需考虑全局分布式任务是否完成,请尽量使用此类型监听器。

接口代码如下:

public interface ElasticJobListener {

    /**
     * 作业执行前的执行的方法.
     * 
     * @param shardingContexts 分片上下文
     */

    void beforeJobExecuted(final ShardingContexts shardingContexts);

    /**
     * 作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */

    void afterJobExecuted(final ShardingContexts shardingContexts);
}

调用执行如下:

// AbstractElasticJobExecutor.java
public final void execute() {
   // ...省略无关代码

   // 执行 作业执行前的方法
   try {
       jobFacade.beforeJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
   // ...省略无关代码(执行 普通触发的作业)
   // ...省略无关代码(执行 被跳过触发的作业)
   // ...省略无关代码(执行 作业失效转移)

   // ...执行 作业执行后的方法
   try {
       jobFacade.afterJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
}
  • JobFacade 对作业监听器简单封装进行调用。

    // LiteJobFacade.java
    @Override
    public void beforeJobExecuted(final ShardingContexts shardingContexts) {
       for (ElasticJobListener each : elasticJobListeners) {
           each.beforeJobExecuted(shardingContexts);
       }
    }

    @Override
    public void afterJobExecuted(final ShardingContexts shardingContexts) {
       for (ElasticJobListener each : elasticJobListeners) {
           each.afterJobExecuted(shardingContexts);
       }
    }
  • 下文提到的 AbstractDistributeOnceElasticJobListener,也是这么调用。

3. AbstractDistributeOnceElasticJobListener

AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次的监听器。

若作业处理数据库数据,处理完成后只需一个节点完成数据清理任务即可。此类型任务处理复杂,需同步分布式环境下作业的状态同步,提供了超时设置来避免作业不同步导致的死锁,请谨慎使用。

创建 AbstractDistributeOnceElasticJobListener 代码如下:

public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {

    /**
     * 开始超时时间
     */

    private final long startedTimeoutMilliseconds;
    /**
     * 开始等待对象
     */

    private final Object startedWait = new Object();
    /**
     * 完成超时时间
     */

    private final long completedTimeoutMilliseconds;
    /**
     * 完成等待对象
     */

    private final Object completedWait = new Object();
    /**
     * 保证分布式任务全部开始和结束状态的服务
     */

    @Setter
    private GuaranteeService guaranteeService;

    private TimeService timeService = new TimeService();

    public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
        if (startedTimeoutMilliseconds <= 0L) {
            this.startedTimeoutMilliseconds = Long.MAX_VALUE;
        } else {
            this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
        }
        if (completedTimeoutMilliseconds <= 0L) {
            this.completedTimeoutMilliseconds = Long.MAX_VALUE; 
        } else {
            this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
        }
    }
}
  • 超时参数 startedTimeoutMillisecondscompletedTimeoutMilliseconds 务必传递,避免作业不同步导致的死锁。

👇下面,我们来看本文的重点:AbstractDistributeOnceElasticJobListener,在分布式作业中只执行一次:

@Override
public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
   // 注册作业分片项开始运行
   guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
   // 判断是否所有的分片项开始运行
   if (guaranteeService.isAllStarted()) {
       // 执行
       doBeforeJobExecutedAtLastStarted(shardingContexts);
       // 清理启动信息
       guaranteeService.clearAllStartedInfo();
       return;
   }
   // 等待
   long before = timeService.getCurrentMillis();
   try {
       synchronized (startedWait) {
           startedWait.wait(startedTimeoutMilliseconds);
       }
   } catch (final InterruptedException ex) {
       Thread.interrupted();
   }
   // 等待超时
   if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
       // 清理启动信息
       guaranteeService.clearAllStartedInfo();
       handleTimeout(startedTimeoutMilliseconds);
   }
}
  • 调用 GuaranteeService#registerStart(...) 方法,注册作业分片项开始运行。

    // GuaranteeService.java
    public void registerStart(final Collection<Integer> shardingItems) {
       for (int each : shardingItems) {
           jobNodeStorage.createJobNodeIfNeeded(GuaranteeNode.getStartedNode(each));
       }
    }

    // GuaranteeNode.java
    public final class GuaranteeNode {
        static final String ROOT = "guarantee";
        static final String STARTED_ROOT = ROOT + "/started";
    }

    static String getStartedNode(final int shardingItem) {
       return Joiner.on("/").join(STARTED_ROOT, shardingItem);
    }
    • Zookeeper 数据节点 /${JOB_NAME}/guarantee/started/${ITEM_INDEX} 为永久节点,存储空串( "" )。为什么是永久节点呢?在 GuaranteeService#isAllStarted() 见分晓。

  • 调用 GuaranteeService#isAllStarted() 方法,判断是否所有的分片项开始运行。

    /**
    * 判断是否所有的任务均启动完毕.
    *
    * @return 是否所有的任务均启动完毕
    */

    public boolean isAllStarted() {
       return jobNodeStorage.isJobNodeExisted(GuaranteeNode.STARTED_ROOT)
               && configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount() == jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT).size();
    }
    • 当 /${JOB_NAME}/guarantee/started/ 目录下,所有作业分片项都开始运行,即运行总数等于作业分片总数( JobCoreConfiguration.ShardingTotalCount ),代表所有的任务均启动完毕

    • 等待所有任务启动过程中,不排除有作业节点会挂掉,如果 /${JOB_NAME}/guarantee/started/${ITEM_INDEX} 存储临时节点,会导致不能满足所有的分片项开始运行的条件。

    • 等待过程中,如果调整作业分片总数( JobCoreConfiguration.ShardingTotalCount ),会导致异常。

  • 当不满足所有的分片项开始运行时,作业节点调用 Object#wait(...) 方法进行等待。该等待怎么结束等待?当满足所有的分片项开始运行的作业节点调用 GuaranteeService#clearAllStartedInfo() 时,StartedNodeRemovedJobListener 会监听到 /${JOB_NAME}/guarantee/started/ 被删除,调用 Object#notifyAll(...) 方法进行唤醒全部。

    // GuaranteeService.java
    /**
    * 清理所有任务启动信息.
    */

    public void clearAllStartedInfo() {
       jobNodeStorage.removeJobNodeIfExisted(GuaranteeNode.STARTED_ROOT);
    }

    // StartedNodeRemovedJobListener.java
    class StartedNodeRemovedJobListener extends AbstractJobListener {

       @Override
       protected void dataChanged(final String path, final Type eventType, final String data) {
           if (Type.NODE_REMOVED == eventType && guaranteeNode.isStartedRootNode(path)) {
               for (ElasticJobListener each : elasticJobListeners) {
                   if (each instanceof AbstractDistributeOnceElasticJobListener) {
                       ((AbstractDistributeOnceElasticJobListener) each).notifyWaitingTaskStart();
                   }
               }
           }
       }
    }  
  • 调用 #doBeforeJobExecutedAtLastStarted(...) 方法,执行最后一个作业执行前的执行的方法,实现该抽象方法,完成自定义逻辑。#doAfterJobExecutedAtLastCompleted(...) 实现的方式一样,就不重复解析了。

    // AbstractDistributeOnceElasticJobListener.java
    /**
    * 分布式环境中最后一个作业执行前的执行的方法.
    *
    * @param shardingContexts 分片上下文
    */

    public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);

    /**
    * 分布式环境中最后一个作业执行后的执行的方法.
    *
    * @param shardingContexts 分片上下文
    */

    public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);

    整体流程如下图:


欢迎加入我的知识星球,一起探讨架构,交流源码。加入方式,长按下方二维码噢

已在知识星球更新源码解析如下:

  • 《精尽 Dubbo 源码解析系列》69 篇。

  • 《精尽 Netty 源码解析系列》61 篇。

  • 《精尽 Spring 源码解析系列》35 篇。

  • 《精尽 MyBatis 源码解析系列》34 篇。

  • 《数据库实体设计》17 篇。

  • 正在准备更新《精尽 Spring MVC 源码解析系列》



目前在知识星球更新了《Dubbo 源码解析》目录如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

... 一共 69+ 篇

目前在知识星球更新了《Netty 源码解析》目录如下:

01. 调试环境搭建
02. NIO 基础
03. Netty 简介
04. 启动 Bootstrap

05. 事件轮询 EventLoop

06. 通道管道 ChannelPipeline

07. 通道 Channel

08. 字节缓冲区 ByteBuf

09. 通道处理器 ChannelHandler

10. 编解码 Codec

11. 工具类 Util

... 一共 61+ 篇


目前在知识星球更新了《数据库实体设计》目录如下:


01. 商品模块
02. 交易模块
03. 营销模块
04. 公用模块

... 一共 17+ 篇


目前在知识星球更新了《Spring 源码解析》目录如下:


01. 调试环境搭建
02. IoC Resource 定位
03. IoC BeanDefinition 载入

04. IoC BeanDefinition 注册

05. IoC Bean 获取

06. IoC Bean 生命周期

... 一共 35+ 篇


目前在知识星球更新了《MyBatis 源码解析》目录如下:


01. 调试环境搭建
02. 项目结构一览
03. MyBatis 面试题合集

04. MyBatis 学习资料合集

05. MyBatis 初始化

06. SQL 初始化

07. SQL 执行

08. 插件体系

09. Spring 集成

... 一共 34+ 篇


源码不易↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存