查看原文
其他

注册中心 Eureka 源码解析 —— EndPoint 与 解析器

老艿艿 芋道源码 2019-05-13

点击上方“芋道源码”,选择“置顶公众号”

技术文章第一时间送达!

源码精品专栏

 

本文主要基于 Eureka 1.8.X 版本

  • 1. 概述

  • 2. EndPoint

  • 2.1 EurekaEndpoint

  • 2.2 DefaultEndpoint

  • 2.3 AwsEndpoint

  • 3. 解析器

  • 3.1 ClusterResolver

  • 3.2 ClosableResolver

  • 3.3 DnsTxtRecordClusterResolver

  • 3.4 ConfigClusterResolver

  • 3.5 ZoneAffinityClusterResolver

  • 3.6 AsyncResolver

    • 3.6.1 定时任务

    • 3.6.2 解析 EndPoint 集群

  • 4. 初始化解析器

  • 666. 彩蛋


1. 概述

本文主要分享 EndPoint 与 解析器

  • EndPoint ,服务端点。例如,Eureka-Server 的访问地址。

  • EndPoint 解析器,将配置的 Eureka-Server 的访问地址解析成 EndPoint 。

目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的兼容配置:

  • 第一种,直接配置实际访问地址。例如,eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2 。

  • 第二种,基于 DNS 解析出访问地址。例如,eureka.shouldUseDns=true 并且eureka.eurekaServer.domainName=eureka.iocoder.cn 。

本文涉及类在 com.netflix.discovery.shared.resolver 包下,涉及到主体类的类图如下( 打开大图 ):

  • 红色部分 —— EndPoint

  • 黄色部分 —— EndPoint 解析器

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。

  • 程序猿DD —— 《Spring Cloud微服务实战》

  • 周立 —— 《Spring Cloud与Docker微服务架构实战》

  • 两书齐买,京东包邮。

推荐 Spring Cloud 视频

  • Java 微服务实践 - Spring Boot

  • Java 微服务实践 - Spring Cloud

  • Java 微服务实践 - Spring Boot / Spring Cloud

2. EndPoint

2.1 EurekaEndpoint

com.netflix.discovery.shared.resolver.EurekaEndpoint ,Eureka 服务端点接口,实现代码如下:

public interface EurekaEndpoint extends Comparable<Object> {

    /**
     * @return 完整的服务 URL
     */

    String getServiceUrl();

    /**
     * @deprecated use {@link #getNetworkAddress()}
     */

    @Deprecated
    String getHostName();

    /**
     * @return 网络地址
     */

    String getNetworkAddress();

    /**
     * @return 端口
     */

    int getPort();

    /**
     * @return 是否安全( https )
     */

    boolean isSecure();

    /**
     * @return 相对路径
     */

    String getRelativeUri();

}

2.2 DefaultEndpoint

com.netflix.discovery.shared.resolver.DefaultEndpoint ,默认 Eureka 服务端点实现类。实现代码如下:

public class DefaultEndpoint implements EurekaEndpoint {

    /**
     * 网络地址
     */

    protected final String networkAddress;
    /**
     * 端口
     */

    protected final int port;
    /**
     * 是否安全( https )
     */

    protected final boolean isSecure;
    /**
     * 相对地址
     */

    protected final String relativeUri;
    /**
     * 完整的服务 URL
     */

    protected final String serviceUrl;

    public DefaultEndpoint(String serviceUrl) {
        this.serviceUrl = serviceUrl;

        // 将 serviceUrl 分解成 几个属性
        try {
            URL url = new URL(serviceUrl);
            this.networkAddress = url.getHost();
            this.port = url.getPort();
            this.isSecure = "https".equals(url.getProtocol());
            this.relativeUri = url.getPath();
        } catch (Exception e) {
            throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl);
        }
    }

    public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri) {
        this.networkAddress = networkAddress;
        this.port = port;
        this.isSecure = isSecure;
        this.relativeUri = relativeUri;

        // 几个属性 拼接成 serviceUrl
        StringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress);
        if (port >= 0) {
            sb.append(':').append(port);
        }
        if (relativeUri != null) {
            if (!relativeUri.startsWith("/")) {
                sb.append('/');
            }
            sb.append(relativeUri);
        }
        this.serviceUrl = sb.toString();
    }
}
  • 重写了 #equals(…) 和 #hashCode(…) 方法,标准实现方式,这里就不贴代码了。

  • 重写了 #compareTo(…) 方法,基于 serviceUrl 属性做比较。

2.3 AwsEndpoint

com.netflix.discovery.shared.resolver.aws.AwsEndpoint ,基于 regionzone 的 Eureka 服务端点实现类 ( 请不要在意 AWS 开头 )。实现代码如下:

public class AwsEndpoint extends DefaultEndpoint {

    /**
     * 区域
     */

    protected final String region;
    /**
     * 可用区
     */

    protected final String zone;
}
  • 重写了 #equals(…) 和 #hashCode(…) 方法,标准实现方式,这里就不贴代码了。

3. 解析器

EndPoint 解析器使用委托设计模式实现。所以,上文图片中我们看到好多个解析器,实际代码非常非常非常清晰

FROM 《委托模式》 
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。

我们在上图的基础上,增加委托的关系,如下图:

3.1 ClusterResolver

com.netflix.discovery.shared.resolver.ClusterResolver ,集群解析器接口。接口代码如下:

public interface ClusterResolver<T extends EurekaEndpoint> {

    /**
     * @return 地区
     */

    String getRegion();

    /**
     * @return EndPoint 集群( 数组 )
     */

    List<T> getClusterEndpoints();

}

3.2 ClosableResolver

com.netflix.discovery.shared.resolver.ClosableResolver ,可关闭的解析器接口,继承自 ClusterResolver 接口。接口代码如下:

public interface ClosableResolver<T extends EurekaEndpoint> extends ClusterResolver<T> {

    /**
     * 关闭
     */

    void shutdown();
}

3.3 DnsTxtRecordClusterResolver

com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver ,基于 DNS TXT 记录类型的集群解析器。类属性代码如下:

public class DnsTxtRecordClusterResolver implements ClusterResolver<AwsEndpoint> {

    /**
     * 地区
     */

    private final String region;
    /**
     * 集群根地址,例如 txt.default.eureka.iocoder.cn
     */

    private final String rootClusterDNS;
    /**
     * 是否解析可用区( zone )
     */

    private final boolean extractZoneFromDNS;
    /**
     * 端口
     */

    private final int port;
    /**
     * 是否安全
     */

    private final boolean isSecure;
    /**
     * 相对地址
     */

    private final String relativeUri;
}
  • DnsTxtRecordClusterResolver 通过集群根地址( rootClusterDNS ) 解析出 EndPoint 集群。需要在 DNS 配置两层解析记录:

    • 主机记录 :格式为 TXT.${ZONE}.${自定义二级域名} 或者 ${ZONE}.${自定义二级域名}

    • 记录类型 :TXT 记录类型

    • 记录值 :EndPoint 的网络地址。如有多个 EndPoint,使用空格分隔。

    • 主机记录 :格式为 TXT.${REGION}.${自定义二级域名} 。

    • 记录类型 :TXT 记录类型

    • 记录值 :第二层的主机记录。如有多个第二层级,使用空格分隔。

    • 第一层 :

    • 第二层:

  • 举个例子:

  • rootClusterDNS ,集群根地址。例如:txt.default.eureka.iocoder.cn,其· txt.default.eureka 为 DNS 解析记录的第一层的主机记录

  • region :地区。需要和 rootClusterDNS 的 ${REGION} 一致。

  • extractZoneFromDNS :是否解析 DNS 解析记录的第二层级的主机记录的 ${ZONE} 可用区。


#getClusterEndpoints(...) 方法,实现代码如下:

  1@Override
  2public List<AwsEndpoint> getClusterEndpoints() {
  3:     List<AwsEndpoint> eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri);
  4:     if (logger.isDebugEnabled()) {
  5:         logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints);
  6:     }
  7:     return eurekaEndpoints;
  8: }
  9
 10private static List<AwsEndpoint> resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri) {
 11:     try {
 12:         // 解析 第一层 DNS 记录
 13:         Set<String> zoneDomainNames = resolve(rootClusterDNS);
 14:         if (zoneDomainNames.isEmpty()) {
 15:             throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS);
 16:         }
 17:         // 记录 第二层 DNS 记录
 18:         List<AwsEndpoint> endpoints = new ArrayList<>();
 19:         for (String zoneDomain : zoneDomainNames) {
 20:             String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null// 
 21:             Set<String> zoneAddresses = resolve(zoneDomain);
 22:             for (String address : zoneAddresses) {
 23:                 endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone));
 24:             }
 25:         }
 26:         return endpoints;
 27:     } catch (NamingException e) {
 28:         throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e);
 29:     }
 30: }
  • 第 12 至 16 行 :调用 #resolve(rootClusterDNS) 解析第一层 DNS 记录。实现代码如下:

      1private static Set<String> resolve(String rootClusterDNS) throws NamingException {
      2:     Set<String> result;
      3:     try {
      4:         result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS);
      5:         // TODO 芋艿:这块是bug,不需要这一段
      6:         if (!rootClusterDNS.startsWith("txt.")) {
      7:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
      8:         }
      9:     } catch (NamingException e) {
     10:         if (!rootClusterDNS.startsWith("txt.")) {
     11:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
     12:         } else {
     13:             throw e;
     14:         }
     15:     }
     16:     return result;
     17: }
    • 第 4 行 : 调用 DnsResolver#getCNamesFromTxtRecord(…) 方法,解析 TXT 主机记录。点击链接查看带中文注释的 DnsResolver 的代码,比较解析,笔者就不啰嗦了。

    • 第 5 至 8 行 :当传递参数 rootClusterDNS 不以 txt. 开头时,即使第 4 行解析成功,也会报错,此时是个 Eureka 的 BUG 。因此,配置 DNS 解析记录时,主机记录暂时必须以 txt. 开头。

  • 第 17 至 25 行 :循环第一层 DNS 记录的解析结果,进一步解析第二层 DNS 记录。

    • 第 20 行 :解析可用区( zone )。

    • 第 21 行 :调用 #resolve(rootClusterDNS) 解析第二层 DNS 记录。

3.4 ConfigClusterResolver

com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver ,基于配置文件的集群解析器。类属性代码如下:

public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint> {

    private final EurekaClientConfig clientConfig;
    private final InstanceInfo myInstanceInfo;

    public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo) {
        this.clientConfig = clientConfig;
        this.myInstanceInfo = myInstanceInfo;
    }
}

#getClusterEndpoints(...) 方法,实现代码如下:

// ... 省略代码,超过微信文章的长度
  • 第 3 至 8 行 :基于 DNS 获取 EndPoint 集群,调用 #getClusterEndpointsFromDns() 方法,实现代码如下:

    // ... 省略代码,超过微信文章的长度
    • 必须配置 eureka.shouldUseDns=true ,开启基于 DNS 获取 EndPoint 集群。

    • 必须配置 eureka.eurekaServer.domainName=${xxxxx} ,配置集群根地址。

    • 选填配 eureka.eurekaServer.port ,eureka.eurekaServer.context 。

    • 从代码中我们可以看出,使用 DnsTxtRecordClusterResolver 解析出 EndPoint 集群。

  • 第 9 至 13 行 :直接配置文件填写实际 EndPoint 集群,调用 #getClusterEndpointsFromConfig() 方法,实现代码如下:

// ... 省略代码,超过微信文章的长度
  • 第 3 行 :获得可用区数组。通过 eureka.${REGION}.availabilityZones 配置。

  • 第 5 行 :调用 InstanceInfo#getZone(…) 方法,获得应用实例自己所在的可用区zone )。非亚马逊 AWS 环境下,可用区数组的第一个元素就是应用实例自己所在的可用区

  • 第 7 行 :调用 EndpointUtils#getServiceUrlsMapFromConfig(...) 方法,获得可用区与 serviceUrls 的映射。实现代码如下:

    // ... 省略代码,超过微信文章的长度
    • 当方法参数 preferSameZone=true ,即 eureka.preferSameZone=true( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【相等】元素的位置。

    • 当方法参数 preferSameZone=false ,即 eureka.preferSameZone=false( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【不相等】元素的位置。

    • 第 13 行 :获得开始位置。实现代码如下:

      // ... 省略代码,超过微信文章的长度
    • 第 20 至 33 行 :从开始位置顺序将剩余的可用区的 serviceUrls 添加到结果。顺序理解如下图:

  • 第 9 至 18 行 :拼装 EndPoint 集群结果。

3.5 ZoneAffinityClusterResolver

com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver ,使用可用区亲和的集群解析器。类属性代码如下:

// ... 省略代码,超过微信文章的长度
  • 属性 delegate ,委托的解析器。目前代码里使用的是 ConfigClusterResolver 。

  • 属性 zoneAffinity ,是否可用区亲和。

    • `true` :EndPoint 可用区为本地的优先被放在前面。

    • `false` :EndPoint 可用区非本地的优先被放在前面。

#getClusterEndpoints(...) 方法,实现代码如下:

// ... 省略代码,超过微信文章的长度
  • 第 2 行 :调用 ClusterResolver#getClusterEndpoints() 方法,获得 EndPoint 集群。再调用 ResolverUtils#splitByZone(…) 方法,拆分成本地非本地的可用区的 EndPoint 集群,点击链接查看实现。

  • 第 8 行 :调用 #randomizeAndMerge(...) 方法,分别随机打乱每个 EndPoint 集群,并进行合并数组,实现代码如下:

    // ... 省略代码,超过微信文章的长度
    • 多个主机,实现对同一个 EndPoint 集群负载均衡的效果。

    • 单个主机,同一个 EndPoint 集群按照固定顺序访问。Eureka-Server 不是强一致性的注册中心,Eureka-Client 对同一个 Eureka-Server 拉取注册信息,保证两者之间增量同步的一致性。

    • 注意,`ResolverUtils#randomize(…)` 使用以本机IP为随机种子,有如下好处:

  • 第 10 至 12 行 :非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面。

3.6 AsyncResolver

com.netflix.discovery.shared.resolver.AsyncResolver ,异步执行解析的集群解析器。AsyncResolver 属性较多,而且复杂的多,我们拆分到具体方法里分享。

3.6.1 定时任务

AsyncResolver 内置定时任务,定时刷新 EndPoint 集群解析结果。

为什么要刷新?例如,Eureka-Server 的 serviceUrls 基于 DNS 配置。

定时任务代码如下

// ... 省略代码,超过微信文章的长度
  • backgroundTask ,后台任务,定时解析 EndPoint 集群。

    • delegate ,委托的解析器,目前代码为 ZoneAffinityClusterResolver。

    • TimedSupervisorTask ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》「2.3 TimedSupervisorTask」 有详细解析。

    • updateTask 实现代码如下:

      // ... 省略代码,超过微信文章的长度
    • 后台任务的发起在 #getClusterEndpoints() 方法,在 「3.6.2 解析 EndPoint 集群」 详细解析。

3.6.2 解析 EndPoint 集群

调用 #getClusterEndpoints() 方法,解析 EndPoint 集群,实现代码如下:

// ... 省略代码,超过微信文章的长度
  • 第 5 至 9 行 :若未预热解析 EndPoint 集群结果,调用 #doWarmUp() 方法,进行预热。若预热失败,取消定时任务的第一次延迟。#doWarmUp() 方法实现代码如下:

    // ... 省略代码,超过微信文章的长度
    • 调用 updateTask ,解析 EndPoint 集群。

  • 第 10 至 13 行 : 若未调度定时任务,进行调度,调用 #scheduleTask() 方法,实现代码如下:

    // ... 省略代码,超过微信文章的长度
    • x

  • 第 15 行 :返回 EndPoint 集群。当第一次预热失败,会返回空,直到定时任务获得到结果

4. 初始化解析器

Eureka-Client 在初始化时,调用 DiscoveryClient#scheduleServerEndpointTask() 方法,初始化 AsyncResolver 解析器。实现代码如下:

// ... 省略代码,超过微信文章的长度                                            
  • 调用 EurekaHttpClients#newBootstrapResolver(...) 方法,创建 EndPoint 解析器,实现代码如下:

     // ... 省略代码,超过微信文章的长度
    • x

    • 第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群

    • 第 26 行 :调用 #defaultBootstrapResolver() 方法,创建默认的解析器 AsyncResolver 。

    • 第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为 delegate 参数。

    • 第 48 行 :调用 ZoneAffinityClusterResolver#getClusterEndpoints() 方法,第一次 Eureka-Server EndPoint 集群解析

    • 第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置( eureka.experimental.clientTransportFailFastOnInit=true ),使 Eureka-Client 初始化失败。#failFastOnInitCheck(...) 方法,实现代码如下:

      // potential future feature, guarding with experimental flag for now
      // ... 省略代码,超过微信文章的长度
  • 第 58 至 64 行 :创建 AsyncResolver 。从代码上,我们可以看到,AsyncResolver.resultsRef 属性一开始已经用 initialValue 传递给 AsyncResolver 构造方法。实现代码如下:

    Java public AsyncResolver(String name, ClusterResolver<T> delegate, List<T> initialValues, int executorThreadPoolSize, int refreshIntervalMs) { this( name, delegate, initialValues, executorThreadPoolSize, refreshIntervalMs, 0 ); ¨K78K }

    • x

666. 彩蛋

T T 一开始看解析器,没反应过来是委托设计模式,一脸懵逼+一脸懵逼+一脸懵逼。后面理顺了,发现超级奈斯( Nice ) 啊 !!!!

胖友,你学会了么?

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?




如果你对 Dubbo 感兴趣,欢迎加入我的知识星球一起交流。


知识星球



目前在知识星球(https://t.zsxq.com/2VbiaEu)更新了如下 Dubbo 源码解析如下:

01. 调试环境搭建
02. 项目结构一览
03. 配置 Configuration
04. 核心流程一览

05. 拓展机制 SPI

06. 线程池

07. 服务暴露 Export

08. 服务引用 Refer

09. 注册中心 Registry

10. 动态编译 Compile

11. 动态代理 Proxy

12. 服务调用 Invoke

13. 调用特性 

14. 过滤器 Filter

15. NIO 服务器

16. P2P 服务器

17. HTTP 服务器

18. 序列化 Serialization

19. 集群容错 Cluster

20. 优雅停机

21. 日志适配

22. 状态检查

23. 监控中心 Monitor

24. 管理中心 Admin

25. 运维命令 QOS

26. 链路追踪 Tracing

...
一共 60 篇++


源码不易↓↓↓↓

点赞支持老艿艿↓↓

    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存