首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

YARN/MRv2 ResourceManager端 源码分析一

2012-06-27 
YARN/MRv2 ResourceManager端 源码分析12. ResourceManager端Client端通过YarnRunner.submitJob()将Applic

YARN/MRv2 ResourceManager端 源码分析1

2. ResourceManager端

Client端通过YarnRunner.submitJob()将Application提交给了ResourceManager。

连接Client与ResourceManager的协议为ClientRMProtocol,该协议的实现类为ClientRMService。

1) ClientRMService.java

Client端与ResourceManager交互的所有操作最终都是由ClientRMService中的操作实现的。以submitApplication()为例。

?

?

?

? 2) RMAppmanager.java

RMAppManager实现了EventHandler接口,代表该类是用于处理某种事件的

?

?

?

3) EventHandler

自此AsyncDispatcher将接管之后所有的事件分发,所有事件都将由AsyncDispatcher分发给对应的EventDispatcher。EventDispatcher会初始化处理该事件的类,并将事件交给创建的类来进行处理。以RMAppEventType.START事件为例,该类将分发给ApplicationEventDispatcher,然后由ApplicationEventDispatcher初始化RMApp的实现类RMAppImpl来处理。

?

?

?

?RMAppImpl.java

?

?

?

状态机的工作方式如下,以RMAppImpl中的状态机为例

?

/* 泛型参数从左到右依次为执行状态变换的类、封装状态的类、封装事件类型的类、封装事件的类   构造函数参数为该状态机的起始状态 */  private static final StateMachineFactory<RMAppImpl,   RMAppState,   RMAppEventType,   RMAppEvent> stateMachineFactory   = new StateMachineFactory<RMAppImpl,   RMAppState,   RMAppEventType,   RMAppEvent>(RMAppState.NEW)/* 添加状态之间的变换以及变换时的需要进行的操作的封装类   以下即表示状态从RMAppState.NEW -> RMAppState.SUBMITTED   的触发事件类型为RMAppEventType.START事件   需要执行的方法被封装在StartAppAttemptTransition类中 */.addTransition(RMAppState.NEW, RMAppState.SUBMITTED,RMAppEventType.START, new StartAppAttemptTransition())......// 构建状态机.installTopology();// 封装执行状态变化所需的方法的类需要实现SingleArcTransition接口,以StartAppAttemptTransition为例// 每个状态机都会对应一个实现了SingleArcTransition接口的类,在这里为RMAppTransition// StartAppAttemptTransition通过继承RMAppTransition并实现transition方法,在该方法中实现状态变化的处理逻辑  private static final class StartAppAttemptTransition extends RMAppTransition {public void transition(RMAppImpl app, RMAppEvent event) {  app.createNewAttempt();};  }
?

?

至此,一个典型的由状态机分发事件并进行处理的相关类介绍完毕。总结如下:

(1) AsyncDispatcher根据接收到的事件按它的类分发给相应的EventDispatcher

(2) EventDispatcher初始化处理该类事件的类A,并将事件传递给A

(3) A调用状态机的doTransition方法确定该事件类型对应的状态变化和封装了需要执行的方法的类

(4) 实现了SingleArcTransition接口的类,将调用transition方法完成状态变换

由于状态机处理代码大都相同,以下将以事件为标题来描述状态变换和涉及的类和操作

4) RMAppEventType.START

EventDispatcher:ApplicationEventDispatcher

事件处理类:RMAppImpl

状态更新:RMAppState.NEW -> RMAppState.SUBMITTED

所需操作:创建RMAppAttemptImpl对象,初始化其状态为RMAppAttemptState.NEW

触发RMAppAttemptEventType.START事件

5) RMAppAttemptEventType.START

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.NEW -> RMAppAttemptState.SUBMITTED

所需操作:向ApplicationMasterService注册该AppAttempt

触发AppAddedSchedulerEvent事件

6) AppAddedSchedulerEvent

EventDispatcher:SchedulerEventDispatcher

事件处理类:FifoScheduler/CapacityScheduler

所需操作:创建SchedulerApp对象

触发RMAppAttemptEventType.APP_ACCEPTED事件

7) RMAppAttemptEventType.APP_ACCEPTED

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.SUBMITTED -> RMAppAttemptState.SCHEDULED

所需操作:调用ResourceScheduler的allocate函数,向ResourceManager申请运行 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ApplicationMaster需要的Container

触发RMAppEventType.APP_ACCEPTED事件

8) RMAppEventType.APP_ACCEPTED

EventDispatcher:ApplicationEventDispatcher

事件处理类:RMAppImpl

状态更新:RMAppState.SUBMITTED -> RMAppState.ACCEPTED

9) 某个NodeManager向ResourceManager发送心跳

? ? ? 10) ResourceManager的ResourceTrackerService收到心跳信息后触发封装了 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RMNodeEventType.STATUS_UPDATE的RMNodeStatusEvent事件

? ? ? 11) RMNodeEventType.STATUS_UPDATE

EventDispatcher:ApplicationEventDispatcher

事件处理类:RMNodeImpl

状态更新:RMNodeState.RUNNING -> RMNodeState.RUNNING

所需操作:更新节点的健康状态

触发NodeUpdateSchedulerEvent事件

? ? ? 12) NodeUpdateSchedulerEvent

EventDispatcher:SchedulerEventDispatcher

事件处理类:FifoScheduler/CapacityScheduler

所需操作:创建SchedulerApp对象,调用assginContainers为该application分配一个 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? container,此时还未真正分配

触发RMContainerEventType.START事件

? ? ? 13) RMContainerEventType.START

EventDispatcher:NodeEventDispatcher

事件处理类:RMContainerImpl

状态更新:RMContainerState.NEW -> RMContainerState.ALLOCATED

所需操作:触发RMAppAttemptContainerAllocatedEvent

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? (RMAppAttemptEventType.CONTAINER_ALLOCATED)事件

? ? ? 14) RMAppAttemptEventType.CONTAINER_ALLOCATED

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作:调用Scheduler的allocate函数申请一个container

触发AMLauncherEventType.LAUNCH事件

? ? ? 15) AMLauncherEventType.LAUNCH

事件处理类:ApplicationMasterLauncher

状态更新:RMAppAttemptState.SCHEDULED -> RMAppAttemptState.ALLOCATED

所需操作:创建AMLauncher对象,并将其添加到队列masterEvents中;

LauncherThread不断从masterEvents取出,进行处理,并调用

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? AMLauncher.launch()函数;

AMLancher.launch()调用ContainerManager.startContainer()函数创建

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? container;

同时触发RMAppAttemptEventType.LAUNCHED事件。

? ? ? 16) RMAppAttemptEventType.LAUNCHED

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.ALLOCATED -> RMAppAttemptState.LAUNCHED

所需操作:向AMLivelinessMonitor注册,用于实时监控该Application的状态

? ? ? 17) 第9)步中向ResourceManager发送心跳的NodeManager,调用

? ? ? ? ? ? ?AMRMProtocol.registerApplicationMaster()向ApplicationMasterService进行注册

? ? ? 18) ApplicationMasterService.registerApplicationMaster()

从request中获取ApplicationAttempt的相关信息

触发RMAppAttemptEventType.REGISTERED事件

返回给调用端response

? ? ? 19) RMAppAttemptEventType.REGISTERED

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作:设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

? ? ? 20) 触发RMAppEventType.REGISTERED

EventDispatcher:ApplicationAttemptEventDispatcher

事件处理类:RMAppAttemptImpl

状态更新:RMAppAttemptState.LAUNCHED -> RMAppAttemptState.RUNNING

所需操作:设置Application注册后的信息,如运行的host、端口、TrackingURL等

触发RMAppEventType.ATTEMPT_REGISTERED事件

? ? ? 21) RMAppEventType.ATTEMPT_REGISTERED

EventDispatcher:ApplicationEventDispatcher

事件处理类:RMAppImpl

状态更新:RMAppState.ACCEPTED -> RMAppState.RUNNING ??

至此,ApplicationMaster(MRAppMaster)创建完毕,之后Application的运行将由ApplicationMaster(MRAppMaster)接管,它将负责向ResourceManager申请运行子任务所需的资源,监控子任务的运行状态,并向ResourceManager汇报Application的运行状态


?

热点排行