查看原文
其他

消息队列中间件 RocketMQ 源码分析 —— Message 发送与接收

2017-06-16 王文斌(芋艿) 芋道源码

原文地址:http://www.yunai.me/RocketMQ/message-send-and-receive/?mp 

(建议使用原文地址阅读:1、阅读体验;2、代码排版混乱因而省略。)
RocketMQ 带注释源码地址 :https://github.com/YunaiV/incubator-rocketmq
😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号



  • 1、概述

  • 2、Producer 发送消息

    • DefaultMQProducerImpl#tryToFindTopicPublishInfo()

    • MQFaultStrategy

    • DefaultMQProducerImpl#sendKernelImpl()

    • MQFaultStrategy

    • LatencyFaultTolerance

    • LatencyFaultToleranceImpl

    • FaultItem

    • DefaultMQProducer#send(Message)

    • DefaultMQProducerImpl#sendDefaultImpl()

  • 3、Broker 接收消息

    • AbstractSendMessageProcessor#msgCheck

    • SendMessageProcessor#sendMessage

    • DefaultMessageStore#putMessage

  • 4、某种结尾


1、概述

  1. Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。

  2. Broker 接收消息。(存储消息在《RocketMQ 源码分析 —— Message 存储》解析)

2、Producer 发送消息

DefaultMQProducer#send(Message)

 // .... 省略代码


  • 说明:发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message)进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

// .... 省略代码


  • 说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。

  • 第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。

  • 第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。

  • 第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()

  • 第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。

  • 第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy

  • 第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()

  • 第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy

  • 第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

// .... 省略代码


  • 说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。

  • 第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。

  • 第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。

  • 第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

MQFaultStrategy

MQFaultStrategy

// .... 省略代码


  • 说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false

  • 第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

  • 第 64 行 :未开启容错策略选择消息队列逻辑。

  • 第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:

    Producer发送消息消耗时长Broker不可用时长
    >= 15000 ms600 * 1000 ms
    >= 3000 ms180 * 1000 ms
    >= 2000 ms120 * 1000 ms
    >= 1000 ms60 * 1000 ms
    >= 550 ms30 * 1000 ms
    >= 100 ms0 ms
    >= 50 ms0 ms

LatencyFaultTolerance

 // .... 省略代码


  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl

 // .... 省略代码


  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem

// .... 省略代码


  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

// .... 省略代码


  • 说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker

  • 第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。

  • 第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader

  • 第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。

3、Broker 接收消息

SendMessageProcessor#sendMessage

// .... 省略代码


  • #processRequest() 说明 :处理消息请求。

  • #sendMessage() 说明 :发送消息,并返回发送消息结果。

  • 第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。

  • 第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。

  • 第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。

  • 第 105 至 118 行 :创建MessageExtBrokerInner

  • 第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。

  • 第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。

  • 第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

AbstractSendMessageProcessor#msgCheck

// .... 省略代码


  • 说明:校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。

  • 第 11 至 18 行 :检查Topic是否可以被发送。目前是 {@link MixAll.DEFAULT_TOPIC} 不被允许发送。

  • 第 20 至 51 行 :当找不到Topic配置,则进行创建。当然,创建会存在不成功的情况,例如说:defaultTopic 的Topic配置不存在,又或者是 存在但是不允许继承,详细解析见《RocketMQ 源码分析 —— Topic》。

DefaultMessageStore#putMessage

 // .... 省略代码
  • 说明:存储消息封装,最终存储需要 CommitLog 实现。

  • 第 7 至 27 行 :校验 Broker 是否可以写入。

  • 第 29 至 39 行 :消息格式与大小校验。

  • 第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

4、某种结尾

感谢阅读、收藏、点赞本文的工程师同学。

阅读源码是件令自己很愉悦的事情,编写源码解析是让自己脑细胞死伤无数的过程,痛并快乐着。

如果有内容写的存在错误,或是不清晰的地方,见笑了,🙂。欢迎加 QQ:7685413 我们一起探讨,共进步。

再次感谢阅读、收藏、点赞本文的工程师同学。





    您可能也对以下帖子感兴趣

    文章有问题?点此查看未经处理的缓存