首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Hadoop的地图red JobTracker端源码概览

2012-11-20 
Hadoop的mapred JobTracker端源码概览http://jiwenke.iteye.com/blog/335093上一节看到TaskTracker启动新

Hadoop的mapred JobTracker端源码概览

http://jiwenke.iteye.com/blog/335093

上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。?
Java代码?

  1. ??????
  2. HeartbeatResponse?heartbeatResponse?=?jobClient.heartbeat(status,???
  3. ????????????????????????????????????????????????????????????justStarted,?askForNewTask,???
  4. ??????????????????????????????????????????????????????????????heartbeatResponseId);??

这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了:?
Java代码?
  1. ????
  2. public?synchronized?HeartbeatResponse?heartbeat(TaskTrackerStatus?status,???
  3. ??????????????????????????????????????????????????boolean?initialContact,?boolean?acceptNewTasks,?short?responseId)???
  4. ????throws?IOException?{??
  5. .............??
  6. ????//如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks??
  7. ????if?(acceptNewTasks)?{??
  8. ??????TaskTrackerStatus?taskTrackerStatus?=?getTaskTracker(trackerName);??
  9. ??????if?(taskTrackerStatus?==?null)?{??
  10. ????????LOG.warn("Unknown?task?tracker?polling;?ignoring:?"?+?trackerName);??
  11. ??????}?else?{??
  12. ????????List<Task>?tasks?=?getSetupAndCleanupTasks(taskTrackerStatus);??
  13. ????//这里是准备assignTask的地方,由配置的调度器来决定怎样调度??
  14. ????????if?(tasks?==?null?)?{??
  15. ?????????tasks?=?taskScheduler.assignTasks(taskTrackerStatus);??
  16. ????????}??
  17. ????????if?(tasks?!=?null)?{??
  18. ??????????for?(Task?task?:?tasks)?{??
  19. ????????????expireLaunchingTasks.addNewTask(task.getTaskID());??
  20. ????????????LOG.debug(trackerName?+?"?->?LaunchTask:?"?+?task.getTaskID());??
  21. ????????????actions.add(new?LaunchTaskAction(task));??
  22. ??????????}??
  23. ????????}??
  24. ??????}??
  25. ????}??

这个taskScheduler采用的是默认的????
Java代码?
  1. taskScheduler?=?(TaskScheduler)?ReflectionUtils.newInstance(schedulerClass,conf);??

这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的:?
Java代码?
  1. ????
  2. public?synchronized?List<Task>?assignTasks(TaskTrackerStatus?taskTracker)??
  3. ??????throws?IOException?{??
  4. ??
  5. ????ClusterStatus?clusterStatus?=?taskTrackerManager.getClusterStatus();??
  6. ????int?numTaskTrackers?=?clusterStatus.getTaskTrackers();??
  7. ??
  8. ????Collection<JobInProgress>?jobQueue?=??
  9. ??????jobQueueJobInProgressListener.getJobQueue();??
  10. ??
  11. ????//??
  12. ????//?Get?map?+?reduce?counts?for?the?current?tracker.??
  13. ????//??
  14. ????int?maxCurrentMapTasks?=?taskTracker.getMaxMapTasks();??
  15. ????int?maxCurrentReduceTasks?=?taskTracker.getMaxReduceTasks();??
  16. ????int?numMaps?=?taskTracker.countMapTasks();??
  17. ????int?numReduces?=?taskTracker.countReduceTasks();??
  18. ??
  19. ????//??
  20. ????//?Compute?average?map?and?reduce?task?numbers?across?pool??
  21. ????//??
  22. ????int?remainingReduceLoad?=?0;??
  23. ????int?remainingMapLoad?=?0;??
  24. ????synchronized?(jobQueue)?{??
  25. ??????for?(JobInProgress?job?:?jobQueue)?{??
  26. ????????if?(job.getStatus().getRunState()?==?JobStatus.RUNNING)?{??
  27. ??????????int?totalMapTasks?=?job.desiredMaps();??
  28. ??????????int?totalReduceTasks?=?job.desiredReduces();??
  29. ??????????remainingMapLoad?+=?(totalMapTasks?-?job.finishedMaps());??
  30. ??????????remainingReduceLoad?+=?(totalReduceTasks?-?job.finishedReduces());??
  31. ????????}??
  32. ??????}??
  33. ????}??
  34. ??
  35. ????//?find?out?the?maximum?number?of?maps?or?reduces?that?we?are?willing??
  36. ????//?to?run?on?any?node.??
  37. ????int?maxMapLoad?=?0;??
  38. ????int?maxReduceLoad?=?0;??
  39. ????if?(numTaskTrackers?>?0)?{??
  40. ??????maxMapLoad?=?Math.min(maxCurrentMapTasks,??
  41. ????????????????????????????(int)?Math.ceil((double)?remainingMapLoad?/???
  42. ????????????????????????????????????????????numTaskTrackers));??
  43. ??????maxReduceLoad?=?Math.min(maxCurrentReduceTasks,??
  44. ???????????????????????????????(int)?Math.ceil((double)?remainingReduceLoad??
  45. ???????????????????????????????????????????????/?numTaskTrackers));??
  46. ????}??
  47. ??????????
  48. ????int?totalMaps?=?clusterStatus.getMapTasks();??
  49. ????int?totalMapTaskCapacity?=?clusterStatus.getMaxMapTasks();??
  50. ????int?totalReduces?=?clusterStatus.getReduceTasks();??
  51. ????int?totalReduceTaskCapacity?=?clusterStatus.getMaxReduceTasks();??
  52. ??
  53. ????//??
  54. ????//?In?the?below?steps,?we?allocate?first?a?map?task?(if?appropriate),??
  55. ????//?and?then?a?reduce?task?if?appropriate.??We?go?through?all?jobs??
  56. ????//?in?order?of?job?arrival;?jobs?only?get?serviced?if?their???
  57. ????//?predecessors?are?serviced,?too.??
  58. ????//??
  59. ??
  60. ????//??
  61. ????//?We?hand?a?task?to?the?current?taskTracker?if?the?given?machine???
  62. ????//?has?a?workload?that's?less?than?the?maximum?load?of?that?kind?of??
  63. ????//?task.??
  64. ????//??
  65. ?????????
  66. ????if?(numMaps?<?maxMapLoad)?{??
  67. ??
  68. ??????int?totalNeededMaps?=?0;??
  69. ??????synchronized?(jobQueue)?{??
  70. ????????for?(JobInProgress?job?:?jobQueue)?{??
  71. ??????????if?(job.getStatus().getRunState()?!=?JobStatus.RUNNING)?{??
  72. ????????????continue;??
  73. ??????????}??
  74. ??????//这里是取得Task的地方,需要到job中去取??
  75. ??????????Task?t?=?job.obtainNewMapTask(taskTracker,?numTaskTrackers,??
  76. ??????????????taskTrackerManager.getNumberOfUniqueHosts());??
  77. ??????????if?(t?!=?null)?{??
  78. ????????????return?Collections.singletonList(t);??
  79. ??????????}??
  80. ??
  81. ??????????//??
  82. ??????????//?Beyond?the?highest-priority?task,?reserve?a?little???
  83. ??????????//?room?for?failures?and?speculative?executions;?don't???
  84. ??????????//?schedule?tasks?to?the?hilt.??
  85. ??????????//??
  86. ??????????totalNeededMaps?+=?job.desiredMaps();??
  87. ??????????int?padding?=?0;??
  88. ??????????if?(numTaskTrackers?>?MIN_CLUSTER_SIZE_FOR_PADDING)?{??
  89. ????????????padding?=?Math.min(maxCurrentMapTasks,??
  90. ???????????????????????????????(int)(totalNeededMaps?*?padFraction));??
  91. ??????????}??
  92. ??????????if?(totalMaps?+?padding?>=?totalMapTaskCapacity)?{??
  93. ????????????break;??
  94. ??????????}??
  95. ????????}??
  96. ??????}??
  97. ????}??
  98. ??
  99. ????//??
  100. font-size: 1em; margin-top:

热点排行