Hadoop的地图red JobTracker端源码概览
Hadoop的mapred JobTracker端源码概览http://jiwenke.iteye.com/blog/335093上一节看到TaskTracker启动新
Hadoop的mapred JobTracker端源码概览
http://jiwenke.iteye.com/blog/335093
上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。?
Java代码?
- ??????
- HeartbeatResponse?heartbeatResponse?=?jobClient.heartbeat(status,???
- ????????????????????????????????????????????????????????????justStarted,?askForNewTask,???
- ??????????????????????????????????????????????????????????????heartbeatResponseId);??
这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:?
Java代码?
- ????
- public?synchronized?HeartbeatResponse?heartbeat(TaskTrackerStatus?status,???
- ??????????????????????????????????????????????????boolean?initialContact,?boolean?acceptNewTasks,?short?responseId)???
- ????throws?IOException?{??
- .............??
- ????//如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks??
- ????if?(acceptNewTasks)?{??
- ??????TaskTrackerStatus?taskTrackerStatus?=?getTaskTracker(trackerName);??
- ??????if?(taskTrackerStatus?==?null)?{??
- ????????LOG.warn("Unknown?task?tracker?polling;?ignoring:?"?+?trackerName);??
- ??????}?else?{??
- ????????List<Task>?tasks?=?getSetupAndCleanupTasks(taskTrackerStatus);??
- ????//这里是准备assignTask的地方,由配置的调度器来决定怎样调度??
- ????????if?(tasks?==?null?)?{??
- ?????????tasks?=?taskScheduler.assignTasks(taskTrackerStatus);??
- ????????}??
- ????????if?(tasks?!=?null)?{??
- ??????????for?(Task?task?:?tasks)?{??
- ????????????expireLaunchingTasks.addNewTask(task.getTaskID());??
- ????????????LOG.debug(trackerName?+?"?->?LaunchTask:?"?+?task.getTaskID());??
- ????????????actions.add(new?LaunchTaskAction(task));??
- ??????????}??
- ????????}??
- ??????}??
- ????}??
这个taskScheduler采用的是默认的????
Java代码?
- taskScheduler?=?(TaskScheduler)?ReflectionUtils.newInstance(schedulerClass,conf);??
这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:?
Java代码?
- ????
- public?synchronized?List<Task>?assignTasks(TaskTrackerStatus?taskTracker)??
- ??????throws?IOException?{??
- ??
- ????ClusterStatus?clusterStatus?=?taskTrackerManager.getClusterStatus();??
- ????int?numTaskTrackers?=?clusterStatus.getTaskTrackers();??
- ??
- ????Collection<JobInProgress>?jobQueue?=??
- ??????jobQueueJobInProgressListener.getJobQueue();??
- ??
- ????//??
- ????//?Get?map?+?reduce?counts?for?the?current?tracker.??
- ????//??
- ????int?maxCurrentMapTasks?=?taskTracker.getMaxMapTasks();??
- ????int?maxCurrentReduceTasks?=?taskTracker.getMaxReduceTasks();??
- ????int?numMaps?=?taskTracker.countMapTasks();??
- ????int?numReduces?=?taskTracker.countReduceTasks();??
- ??
- ????//??
- ????//?Compute?average?map?and?reduce?task?numbers?across?pool??
- ????//??
- ????int?remainingReduceLoad?=?0;??
- ????int?remainingMapLoad?=?0;??
- ????synchronized?(jobQueue)?{??
- ??????for?(JobInProgress?job?:?jobQueue)?{??
- ????????if?(job.getStatus().getRunState()?==?JobStatus.RUNNING)?{??
- ??????????int?totalMapTasks?=?job.desiredMaps();??
- ??????????int?totalReduceTasks?=?job.desiredReduces();??
- ??????????remainingMapLoad?+=?(totalMapTasks?-?job.finishedMaps());??
- ??????????remainingReduceLoad?+=?(totalReduceTasks?-?job.finishedReduces());??
- ????????}??
- ??????}??
- ????}??
- ??
- ????//?find?out?the?maximum?number?of?maps?or?reduces?that?we?are?willing??
- ????//?to?run?on?any?node.??
- ????int?maxMapLoad?=?0;??
- ????int?maxReduceLoad?=?0;??
- ????if?(numTaskTrackers?>?0)?{??
- ??????maxMapLoad?=?Math.min(maxCurrentMapTasks,??
- ????????????????????????????(int)?Math.ceil((double)?remainingMapLoad?/???
- ????????????????????????????????????????????numTaskTrackers));??
- ??????maxReduceLoad?=?Math.min(maxCurrentReduceTasks,??
- ???????????????????????????????(int)?Math.ceil((double)?remainingReduceLoad??
- ???????????????????????????????????????????????/?numTaskTrackers));??
- ????}??
- ??????????
- ????int?totalMaps?=?clusterStatus.getMapTasks();??
- ????int?totalMapTaskCapacity?=?clusterStatus.getMaxMapTasks();??
- ????int?totalReduces?=?clusterStatus.getReduceTasks();??
- ????int?totalReduceTaskCapacity?=?clusterStatus.getMaxReduceTasks();??
- ??
- ????//??
- ????//?In?the?below?steps,?we?allocate?first?a?map?task?(if?appropriate),??
- ????//?and?then?a?reduce?task?if?appropriate.??We?go?through?all?jobs??
- ????//?in?order?of?job?arrival;?jobs?only?get?serviced?if?their???
- ????//?predecessors?are?serviced,?too.??
- ????//??
- ??
- ????//??
- ????//?We?hand?a?task?to?the?current?taskTracker?if?the?given?machine???
- ????//?has?a?workload?that's?less?than?the?maximum?load?of?that?kind?of??
- ????//?task.??
- ????//??
- ?????????
- ????if?(numMaps?<?maxMapLoad)?{??
- ??
- ??????int?totalNeededMaps?=?0;??
- ??????synchronized?(jobQueue)?{??
- ????????for?(JobInProgress?job?:?jobQueue)?{??
- ??????????if?(job.getStatus().getRunState()?!=?JobStatus.RUNNING)?{??
- ????????????continue;??
- ??????????}??
- ??????//这里是取得Task的地方,需要到job中去取??
- ??????????Task?t?=?job.obtainNewMapTask(taskTracker,?numTaskTrackers,??
- ??????????????taskTrackerManager.getNumberOfUniqueHosts());??
- ??????????if?(t?!=?null)?{??
- ????????????return?Collections.singletonList(t);??
- ??????????}??
- ??
- ??????????//??
- ??????????//?Beyond?the?highest-priority?task,?reserve?a?little???
- ??????????//?room?for?failures?and?speculative?executions;?don't???
- ??????????//?schedule?tasks?to?the?hilt.??
- ??????????//??
- ??????????totalNeededMaps?+=?job.desiredMaps();??
- ??????????int?padding?=?0;??
- ??????????if?(numTaskTrackers?>?MIN_CLUSTER_SIZE_FOR_PADDING)?{??
- ????????????padding?=?Math.min(maxCurrentMapTasks,??
- ???????????????????????????????(int)(totalNeededMaps?*?padFraction));??
- ??????????}??
- ??????????if?(totalMaps?+?padding?>=?totalMapTaskCapacity)?{??
- ????????????break;??
- ??????????}??
- ????????}??
- ??????}??
- ????}??
- ??
- ????//??
- font-size: 1em; margin-top: