首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件开发 >

hadoop学习札记(一)——hadoop运行源代码分析

2012-09-03 
hadoop学习笔记(一)——hadoop运行源代码分析?Hadoop运行流程分析源代码级??? 前言:??? 最近一直在分析hadoo

hadoop学习笔记(一)——hadoop运行源代码分析



?Hadoop运行流程分析源代码级

??? 前言:

??? 最近一直在分析hadoop的运行流程,我们查阅了大量的资料,虽然从感性上对这个流程有了一个认识但是我总是感觉对mapreduce的运行还是没有一个全面的认识,所以决定从源代码级别对mapreduce的运行流程做一个分析。

??? 前奏:

??? 首先从任务提交开始,如果我们使用的是job类的话那么提交任务的触发语句是

??????? job.waitForCompletion(true),true表示运行时打印运行的信息;

??? 在eclipse中我们按F3键可以发现这个方法的代码,这个方法实际是调用了job类的submit方法,而submit方法又调用submitJobInternal(conf)的方法提交任务,然后这个方法会将job的job.jar,job.split,job.xml三个文件上传倒hdfs文件系统,

ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {

????? public RunningJob run() throws FileNotFoundException,

????? ClassNotFoundException,

????? InterruptedException,

????? IOException{

??????? JobConf jobCopy = job;

??????? Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,

??????????? jobCopy);

??????? //得到job的ID;

??????? JobID jobId = jobSubmitClient.getNewJobId();

??? ? //上传Job的路径

??????? Path submitJobDir = new Path(jobStagingArea, jobId.toString());

??????? jobCopy.set("mapreduce.job.dir", submitJobDir.toString());

??????? JobStatus status = null;

??????? try {

????????? populateTokenCache(jobCopy, jobCopy.getCredentials());

?

????????? copyAndConfigureFiles(jobCopy, submitJobDir);

??? ???

??? ??? //获得namenode的任务代理

????????? // get delegation token for the dir

????????? TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),

????? ????????????????????????????????????????new Path [] {submitJobDir},

????????????????????????????????????????????? jobCopy);

?????? //得到job配置路径;

????????? Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

?????? //设置job的reduce的任务数目默认为1

????????? int reduces = jobCopy.getNumReduceTasks();

?????? //得到本地的IP

????????? InetAddress ip = InetAddress.getLocalHost();

????????? if (ip != null) {

??????????? job.setJobSubmitHostAddress(ip.getHostAddress());

??????????? job.setJobSubmitHostName(ip.getHostName());

????????? }

????????? JobContext context = new JobContext(jobCopy, jobId);

?

????????? jobCopy = (JobConf)context.getConfiguration();

?

????????? // Check the output specification

????????? if (reduces == 0 ? jobCopy.getUseNewMapper() :

??????????? jobCopy.getUseNewReducer()) {

??????????? org.apache.hadoop.mapreduce.OutputFormat<?,?> output =

????????????? ReflectionUtils.newInstance(context.getOutputFormatClass(),

????????????????? jobCopy);

??????????? output.checkOutputSpecs(context);

????????? } else {

??????????? jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);

????????? }

?

???? ????创建文件的split

????????? FileSystem fs = submitJobDir.getFileSystem(jobCopy);

????????? LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));

????????? int maps = writeSplits(context, submitJobDir);

????????? jobCopy.setNumMapTasks(maps);

?

????????? // write "queue admins of the queue to which job is being submitted"

????????? // to job file.

????????? String queue = jobCopy.getQueueName();

????????? AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);

????????? jobCopy.set(QueueManager.toFullPropertyName(queue,

????????????? QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

?

????????? // Write job file to JobTracker's fs???????

????????? FSDataOutputStream out =

??????????? FileSystem.create(fs, submitJobFile,

??????????????? new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

?

????????? try {

?????? ????? //写入xml文件

??????????? jobCopy.writeXml(out);

????????? } finally {

??????????? out.close();

????????? }

????????? //

????????? // Now, actually submit the job (using the submit name)

????????? //

????????? printTokens(jobId, jobCopy.getCredentials());

?????? //使用代理机制提交作业

????????? status = jobSubmitClient.submitJob(

????????????? jobId, submitJobDir.toString(), jobCopy.getCredentials());

????????? if (status != null) {

??????????? return new NetworkedJob(status);

????????? } else {

??????????? throw new IOException("Could not launch job");

????????? }

??????? } finally {

????????? if (status == null) {

??????????? LOG.info("Cleaning up the staging area " + submitJobDir);

??????????? if (fs != null && submitJobDir != null)

????????????? fs.delete(submitJobDir, true);

????????? }

??????? }

????? }

??? });

??? 查看这个方法我们发现在这里我们可以得到一个job的路径和job的id。然后使用代理机制提交作业。

任务提交完之后jobtracker会产生一个JobInProgress类的实例,这个类表示了job的各种信息,以及job所需要执行的各种动作。JobTracker收倒提交的数据之后就会根据job的配置tasktracker分配任务,当所有的tasktracker执行完之后才会通知jobclient任务完成。Jobtracker会将job加到一个队列中去这个队列叫jobInitQuene,然后在JobTracker有一个名为JobQueueTaskSchedule的对象,会轮询队列的每一个对象,一旦有新的job加入就将其取出,然后将其初始化。对于每个task还会有一个TaskInprogress对象与其对应。TaskTracker启动之后和JobTracker的通信机制是通过心跳机制。

TaskTracker每个三秒钟向JobTracker发送一个JobTracker发送一个HearBeat,HeartBeat中会有很多Taskracker的信息。JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果 TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在 Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的 Heartbeat的回复消息称为HeartbeatResponse。

在TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内 部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。

不论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。 等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。

等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。

代码详解:

当我们在hadoop中bin/start-all.sh之后我们查看脚本会发现,它启动了三个脚本hadoop-config.sh,start-dfs.sh ,start-mapred.sh。Hadoop会根据一系列的配置启动JobTracker和TaskTracker。Master会根据SSH登录登录到slaves机器上启动tasktracker和datanode。

下面结合hadoop的源代码进行流程分析

先介绍JobTracker和TaskTracker

在每一个JobTracker对应一个org.apache.hadoop.mapred.JobTracker类,这个类主要负责任务的接受,调度以及对TaskTracker的监控,每个JobTracker类是作为一个单独的JVM来使用的。通过方法startTracker()方法启动一个JobTracker。源代码:

?

? /**

?? * Start the JobTracker with given configuration.

?? *

?? * The conf will be modified to reflect the actual ports on which

? ?* the JobTracker is up and running if the user passes the port as

?? * <code>zero</code>.

?? *??

?? * @param conf configuration for the JobTracker.

?? * @throws IOException

?? */

?public static JobTracker startTracker(JobConf conf, String identifier)

? throws IOException, InterruptedException {

???

??? DefaultMetricsSystem.initialize("JobTracker");

??? JobTracker result = null;

??? while (true) {

????? try {

??? ?? //初始化 JobTracker名为result

??????? result = new JobTracker(conf, identifier);

??????? result.taskScheduler.setTaskTrackerManager(result);

??????? break;

????? } catch (VersionMismatch e) {

??????? throw e;

????? } catch (BindException e) {

??????? throw e;

?? ???} catch (UnknownHostException e) {

??????? throw e;

????? } catch (AccessControlException ace) {

??????? // in case of jobtracker not having right access

??????? // bail out

??????? throw ace;

????? } catch (IOException e) {

??????? LOG.warn("Error starting tracker: " +

???????????????? StringUtils.stringifyException(e));

????? }

????? Thread.sleep(1000);

??? }

??? if (result != null) {

????? JobEndNotifier.startNotifier();

????? MBeans.register("JobTracker", "JobTrackerInfo", result);

??? }

??? return result;

? }

??? ?startTracker根据conf配置创建JobTracker对象,然后进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。

??? 还有一个重要的方法就是offerService方法

? /**

?? * Run forever

?? */

? public void offerService() throws InterruptedException, IOException {

??? // Prepare for recovery. This is done irrespective of the status of restart

??? // flag.

??? while (true) {

????? try {

??????? recoveryManager.updateRestartCount();

??????? break;

????? } catch (IOException ioe) {

??????? LOG.warn("Failed to initialize recovery manager. ", ioe);

??????? // wait for some time

??????? Thread.sleep(FS_ACCESS_RETRY_PERIOD);

??????? LOG.warn("Retrying...");

????? }

??? }

???

??? taskScheduler.start();

???

??? //? Start the recovery after starting the scheduler

??? try {

??? //恢复

????? recoveryManager.recover();

??? } catch (Throwable t) {

????? LOG.warn("Recovery manager crashed! Ignoring.", t);

??? }

??? // refresh the node list as the recovery manager might have added

??? // disallowed trackers

???

??? refreshHosts();

???

?? ?this.expireTrackersThread = new Thread(this.expireTrackers,

????????????????????????????????????????? "expireTrackers");

??? this.expireTrackersThread.start();

??? this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");

??? this.retireJobsThread.start();

??? expireLaunchingTaskThread.start();

?

??? if (completedJobStatusStore.isActive()) {

????? completedJobsStoreThread = new Thread(completedJobStatusStore,

??????????????????????????????????????????? "completedjobsStore-housekeeper");

????? completedJobsStoreThread.start();

??? }

?

??? // start the inter-tracker server once the jt is ready

??? this.interTrackerServer.start();

???

??? synchronized (this) {

????? state = State.RUNNING;

??? }

??? LOG.info("Starting RUNNING");

???

??? this.interTrackerServer.join();

??? LOG.info("Stopped interTrackerServer");

? }

??? 这个方法会一直执行,这里虽然是个死循环但是它能不断的恢复,启动任务,通过这种方式首先调度。

该方法调用了taskSchedule对象的start()方法。该对象是JobTracker的数据成员,类型提供了一些列接口,使得JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类为JobQueueTaskSchedule类。所以,taskSchedule.start()方法实际执行的是JobQueueSchedule的start方法;

? /**

?? * Lifecycle method to allow the scheduler to start any work in separate

?? * threads.

?? * @throws IOException

?? */

? public void start() throws IOException {

??? // do nothing

? }

?

??? //请看JobQueueSchedule的start方法;

??? ?public synchronized void start() throws IOException {

??? ?????? super.start();//什么都不做

??? //添加JobInProgressListener监听器

??? taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);

??? //添加任务初始化监听器

??? eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);

??? eagerTaskInitializationListener.start();//启动

??? taskTrackerManager.addJobInProgressListener(

??????? eagerTaskInitializationListener);

? }

??? JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听器jobQueueJobInProgressListener和eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方 法,对job进行初始化。

??? 在看一下JobInProgress类的initTasks方法

? /**

?? * Construct the splits, etc.? This is invoked from an async

?? * thread so that split-computation doesn't block anyone.

?? */

? public synchronized void initTasks()

? throws IOException, KillInterruptedException {

??? if (tasksInited || isComplete()) {

????? return;

??? }

??? synchronized(jobInitKillStatus){

????? if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {

??????? return;//已经启动或者任务被kill直接结束方法;

????? }

????? jobInitKillStatus.initStarted = true;

??? }

?

??? LOG.info("Initializing " + jobId);

??? final long startTimeFinal = this.startTime;

??? // log job info as the user running the job

try {

在这里执行job

??? userUGI.doAs(new PrivilegedExceptionAction<Object>() {

????? @Override

????? public Object run() throws Exception {

??????? JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,

??????????? startTimeFinal, hasRestarted());

??????? return null;

????? }

??? });

??? } catch(InterruptedException ie) {

????? throw new IOException(ie);

??? }

???

??? // log the job priority

??? setPriority(this.priority);

???

??? //

??? // generate security keys needed by Tasks

??? //

??? generateAndStoreTokens();

???

??? //

??? // read input splits and create a map per a split

??? //

??? //读取每个spit

??? TaskSplitMetaInfo[] splits = createSplits(jobId);

??? if (numMapTasks != splits.length) {

????? throw new IOException("Number of maps in JobConf doesn't match number of " +

????? ???? "recieved splits for job " + jobId + "! " +

????? ???? "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);

??? }

??? numMapTasks = splits.length;

?

??? //map和reduce任务等待直到得到slots才开始执行;

??? jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);

??? jobtracker.getInstrumentation().addWaitingReduces(getJobID(),numReduceTasks;

?

?

下面通过初始化map任务

??? maps = new TaskInProgress[numMapTasks];

??? for(int i=0; i < numMapTasks; ++i) {

????? inputLength += splits[i].getInputDataLength();

??? //初始化map任务;

????? maps[i] = new TaskInProgress(jobId, jobFile,

?????????????????????????????????? splits[i],

?????????????????????????????????? jobtracker, conf, this, i, numSlotsPerMap);

??? }

??? LOG.info("Input size for job " + jobId + " = " + inputLength

??????? + ". Number of splits = " + splits.length);

?

??? // Set localityWaitFactor before creating cache

??? localityWaitFactor =

????? conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);

??? if (numMapTasks > 0) {

??? //创建

????? nonRunningMapCache = createCache(splits, maxLevel);

??? }

???????

??? // set the launch time

??? this.launchTime = jobtracker.getClock().getTime();

?

在这里创建reduce任务;

??? //

??? // Create reduce tasks

??? //

??? this.reduces = new TaskInProgress[numReduceTasks];

??? for (int i = 0; i < numReduceTasks; i++) {

????? reduces[i] = new TaskInProgress(jobId, jobFile,

????????????????????????????????????? numMapTasks, i,

????????????????????????????????????? jobtracker, conf, this, numSlotsPerReduce);

?? ???nonRunningReduces.add(reduces[i]);

??? }

?

??? ?

在这里计算启动reduce任务时最小的map任务数目

??? completedMapsForReduceSlowstart =

????? (int)Math.ceil(

????????? (conf.getFloat("mapred.reduce.slowstart.completed.maps",

???????????????????????? DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *

?????????? numMapTasks));

???

估计所有的map输出的数目

??? resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);

???

??? // create cleanup two cleanup tips, one map and one reduce.

??? cleanup = new TaskInProgress[2];

?

??? // cleanup map tip. This map doesn't use any splits. Just assign an empty

??? Split的信息.

??? TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;

??? cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

??????????? jobtracker, conf, this, numMapTasks, 1);

??? cleanup[0].setJobCleanupTask();

?

??? // cleanup reduce tip.

??? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

?????????????????????? numReduceTasks, jobtracker, conf, this, 1);

??? cleanup[1].setJobCleanupTask();

?

??? // create two setup tips, one map and one reduce.

??? setup = new TaskInProgress[2];

?

??? // setup map tip. This map doesn't use any split. Just assign an empty

??? // split.

??? setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,

??????????? jobtracker, conf, this, numMapTasks + 1, 1);

??? setup[0].setJobSetupTask();

?

??? // setup reduce tip.

??? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

?????????????????????? numReduceTasks + 1, jobtracker, conf, this, 1);

??? setup[1].setJobSetupTask();

???

??? synchronized(jobInitKillStatus){

????? jobInitKillStatus.initDone = true;

????? if(jobInitKillStatus.killed) {

??????? throw new KillInterruptedException("Job " + jobId + " killed in init");

????? }

??? }

???

??? tasksInited = true;

??? JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,

???????????????????????????????? numMapTasks, numReduceTasks);

???

?? // Log the number of map and reduce tasks

?? LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks

??????????? + " map tasks and " + numReduceTasks + " reduce tasks.");

? }

代码中我们发现maptask和reducetTask都是TaskInProgress的实例,并且Task的数目与map的数目保持一致。

?

?

接下来要创建datanode的tasktracker

在hadoop中每一个tasktracker对应一个org.apache.hadoop.mapred.TaskTracker类,这个类实现了tasktracker的各种功能。每一个TaskTracker也是组为一个单独的JVM来使用的。在hadoop脚本中对应bin/hadoop-daemon.sh start jobtracker

先来看一下TaskTracker的主函数:

? /**

?? * Start the TaskTracker, point toward the indicated JobTracker

?? */

? public static void main(String argv[]) throws Exception {

??? StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);

??? if (argv.length != 0) {

????? System.out.println("usage: TaskTracker");

????? System.exit(-1);

??? }

??? try {

????? JobConf conf=new JobConf();

????? // enable the server to track time spent waiting on locks

????? //使用代理机制让服务器等待判断

????? ReflectionUtils.setContentionTracing

??????? (conf.getBoolean("tasktracker.contention.tracking", false));

????? DefaultMetricsSystem.initialize("TaskTracker");

?????? TaskTracker tt = new TaskTracker(conf);

????? MBeans.register("TaskTracker", "TaskTrackerInfo", tt);

??? //做重要的语句启动 TaskTracker

????? tt.run();

??? } catch (Throwable e) {

????? LOG.error("Can not start task tracker because "+

??????????????? StringUtils.stringifyException(e));

????? System.exit(-1);

??? }

? }

?在run方法中会启动方法offerService()方法:

??

? public void run() {

??? try {

????? getUserLogManager().start();

????? startCleanupThreads();

????? boolean denied = false;

????? while (running && !shuttingDown && !denied) {

??????? boolean staleState = false;

??????? try {

????????? // This while-loop attempts reconnects if we get network errors

????????? while (running && !staleState && !shuttingDown && !denied) {

??????????? try {

????????????? State osState = offerService();

????????????? if (osState == State.STALE) {

??????????????? staleState = true;

????????????? } else if (osState == State.DENIED) {

??????????????? denied = true;

????????????? }

??????????? } catch (Exception ex) {

????????????? if (!shuttingDown) {

??????????????? LOG.info("Lost connection to JobTracker [" +

???????????????????????? jobTrackAddr + "].? Retrying...", ex);

??????????????? try {

????????????????? Thread.sleep(5000);

??????????????? } catch (InterruptedException ie) {

??????????????? }

????????????? }

??????????? }

????????? }

??????? } finally {

????????? close();

??????? }

??????? if (shuttingDown) { return; }

??????? LOG.warn("Reinitializing local state");

??????? initialize();

????? }

????? if (denied) {

??????? shutdown();

????? }

??? } catch (IOException iex) {

????? LOG.error("Got fatal exception while reinitializing TaskTracker: " +

??????? ????????StringUtils.stringifyException(iex));

????? return;

??? }

??? catch (InterruptedException i) {

????? LOG.error("Got interrupted while reinitializing TaskTracker: " +

????????? i.getMessage());

????? return;

??? }

? }

?

再来看offerService()方法:

? State offerService() throws Exception {

??? //得到上次发送心跳的时间;

??? long lastHeartbeat = System.currentTimeMillis();

?

??? while (running && !shuttingDown) {

????? try {

??????? long now = System.currentTimeMillis();

???????

??????? // accelerate to account for multiple finished tasks up-front

??????? synchronized (finishedCount) {

????????? long remaining =

??????????? (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;

????????? while (remaining > 0) {

??????????? // sleeps for the wait time or

??????????? // until there are *enough* empty slots to schedule tasks

??????????? finishedCount.wait(remaining);

+

??????????? // Recompute

??????????? now = System.currentTimeMillis();

??????????? remaining =

????????????? (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;

????????? }

????????? // Reset count

????????? finishedCount.set(0);

??????? }

?

??????? // If the TaskTracker is just starting up:

??????? // 1. Verify the buildVersion

??????? // 2. Get the system directory & filesystem

??????? if(justInited) {

????????? String jobTrackerBV = jobClient.getBuildVersion();

????????? if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {

??????????? String msg = "Shutting down. Incompatible buildVersion." +

?????? ?????"\nJobTracker's: " + jobTrackerBV +

??????????? "\nTaskTracker's: "+ VersionInfo.getBuildVersion();

??????????? LOG.error(msg);

??????????? try {

????????????? jobClient.reportTaskTrackerError(taskTrackerName, null, msg);

??????????? } catch(Exception e ) {

????????????? LOG.info("Problem reporting to jobtracker: " + e);

??????????? }

??????????? return State.DENIED;

????????? }

?????????

????????? String dir = jobClient.getSystemDir();

????????? if (dir == null) {

??????????? throw new IOException("Failed to get system directory");

????????? }

????????? systemDirectory = new Path(dir);

????????? systemFS = systemDirectory.getFileSystem(fConf);

??????? }

???????

??????? // Send the heartbeat and process the jobtracker's directives

??????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

?

??????? // Note the time when the heartbeat returned, use this to decide when to send the

??????? // next heartbeat??

??????? lastHeartbeat = System.currentTimeMillis();

???????

???????

??????? // Check if the map-event list needs purging

??????? Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();

??????? if (jobs.size() > 0) {

????????? synchronized (this) {

??????????? // purge the local map events list

??????????? for (JobID job : jobs) {

?????? ???????RunningJob rjob;

????????????? synchronized (runningJobs) {

??????????????? rjob = runningJobs.get(job);?????????

??????????????? if (rjob != null) {

????????????????? synchronized (rjob) {

??????????????????? FetchStatus f = rjob.getFetchStatus();

??????????????????? if (f != null) {

????????????????????? f.reset();

??????????????????? }

????????????????? }

??????????????? }

????????????? }

??????????? }

?

??????????? // Mark the reducers in shuffle for rollback

??????????? synchronized (shouldReset) {

????????????? for (Map.Entry<TaskAttemptID, TaskInProgress> entry

?????????????????? : runningTasks.entrySet()) {

??????????????? if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {

????????????????? this.shouldReset.add(entry.getKey());

??????????????? }

????????????? }

??????????? }

????????? }

??????? }

??????? ?

?在返回的心跳heartbeatResponse中有很多jobTracker的指令

??????? TaskTrackerAction[] actions = heartbeatResponse.getActions();

??????? if(LOG.isDebugEnabled()) {

????????? LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +

??????????????????? heartbeatResponse.getResponseId() + " and " +

??????????????????? ((actions != null) ? actions.length : 0) + " actions");

??????? }

??????? if (reinitTaskTracker(actions)) {

????????? return State.STALE;

??????? }

???????????

??????? // resetting heartbeat interval from the response.

??????? heartbeatInterval = heartbeatResponse.getHeartbeatInterval();

??????? justStarted = false;

??????? justInited = false;

?下面执行action

??????? if (actions != null){

????????? for(TaskTrackerAction action: actions) {

??????????? if (action instanceof LaunchTaskAction) {

????????????? 添加进队列

????????????? addToTaskQueue((LaunchTaskAction)action);

??????????? } else if (action instanceof CommitTaskAction) {

????????????? CommitTaskAction commitAction = (CommitTaskAction)action;

????????????? if (!commitResponses.contains(commitAction.getTaskID())) {

??????????????? LOG.info("Received commit task action for " +

????????????????????????? commitAction.getTaskID());

??????????????? commitResponses.add(commitAction.getTaskID());

????????????? }

??????????? } else {

????????????? tasksToCleanup.put(action);

??????????? }

????????? }

??????? }

??????? markUnresponsiveTasks();

??????? killOverflowingTasks();

???????????

??????? //we've cleaned up, resume normal operation

??????? if (!acceptNewTasks && isIdle()) {

????????? acceptNewTasks=true;

??????? }

??????? //The check below may not be required every iteration but we are

??????? //erring on the side of caution here. We have seen many cases where

??????? //the call to jetty's getLocalPort() returns different values at

??????? //different times. Being a real paranoid here.

??????? checkJettyPort(server.getPort());

????? } catch (InterruptedException ie) {

??????? LOG.info("Interrupted. Closing down.");

??????? return State.INTERRUPTED;

????? } catch (DiskErrorException de) {

??????? String msg = "Exiting task tracker for disk error:\n" +

????????? StringUtils.stringifyException(de);

??????? LOG.error(msg);

??????? synchronized (this) {

报告错误

????????? jobClient.reportTaskTrackerError(taskTrackerName,

?????????????????????????????????????????? "DiskErrorException", msg);

??????? }

??????? return State.STALE;

????? } catch (RemoteException re) {

??????? String reClass = re.getClassName();

??????? if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {

????????? LOG.info("Tasktracker disallowed by JobTracker.");

????????? return State.DENIED;

??????? }

????? } catch (Exception except) {

??????? String msg = "Caught exception: " +

????????? StringUtils.stringifyException(except);

??????? LOG.error(msg);

????? }

??? }

?

??? return State.NORMAL;

? }

TaskTracker每个3秒钟会向JobTracker发送一个心跳。心跳机制采用了RPC代理机制实现通信。关于代理机制其实就是一个远程方法调用的机制,这个大家可以参考其他的资料。

?
hadoop学习札记(一)——hadoop运行源代码分析
?

接下来JobTracker接受心跳并向TaskTracker分配任务,当JobTracker接受到心跳的时候会调用方法:heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)返回值是一个HeartbeatResponse对象:

?

? ?

? public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

????????????????????????????????????????????????? boolean restarted,

????????????????????????????????????????????????? boolean initialContact,

??????????????????? ??????????????????????????????boolean acceptNewTasks,

????????????????????????????????????????????????? short responseId)

??? throws IOException {

??? if (LOG.isDebugEnabled()) {

????? LOG.debug("Got heartbeat from: " + status.getTrackerName() +

?????? ?????????" (restarted: " + restarted +

??????????????? " initialContact: " + initialContact +

??????????????? " acceptNewTasks: " + acceptNewTasks + ")" +

??????????????? " with responseId: " + responseId);

??? }

?

??? // Make sure heartbeat is from a tasktracker allowed by the jobtracker.

??? if (!acceptTaskTracker(status)) {

????? throw new DisallowedTaskTrackerException(status);

??? }

?

??? // First check if the last heartbeat response got through

??? String trackerName = status.getTrackerName();

??? long now = clock.getTime();

??? if (restarted) {

????? faultyTrackers.markTrackerHealthy(status.getHost());

??? } else {

????? faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);

??? }

???

??? HeartbeatResponse prevHeartbeatResponse =

????? trackerToHeartbeatResponseMap.get(trackerName);

??? boolean addRestartInfo = false;

?

??? if (initialContact != true) {

????? // If this isn't the 'initial contact' from the tasktracker,

????? // there is something seriously wrong if the JobTracker has

????? // no record of the 'previous heartbeat'; if so, ask the

????? // tasktracker to re-initialize itself.

????? if (prevHeartbeatResponse == null) {

??????? // This is the first heartbeat from the old tracker to the newly

??????? // started JobTracker

??????? if (hasRestarted()) {

????????? addRestartInfo = true;

????????? // inform the recovery manager about this tracker joining back

????????? recoveryManager.unMarkTracker(trackerName);

??????? } else {

????????? // Jobtracker might have restarted but no recovery is needed

????????? // otherwise this code should not be reached

????????? LOG.warn("Serious problem, cannot find record of 'previous' " +

?????????????????? "heartbeat for '" + trackerName +

?????????????????? "'; reinitializing the tasktracker");

? ????????return new HeartbeatResponse(responseId,

????????????? new TaskTrackerAction[] {new ReinitTrackerAction()});

??????? }

?

????? } else {

???????????????

??????? // It is completely safe to not process a 'duplicate' heartbeat from a

??????? // {@link TaskTracker} since it resends the heartbeat when rpcs are

??????? // lost see {@link TaskTracker.transmitHeartbeat()};

??????? // acknowledge it by re-sending the previous response to let the

??????? // {@link TaskTracker} go forward.

??????? if (prevHeartbeatResponse.getResponseId() != responseId) {

????????? LOG.info("Ignoring 'duplicate' heartbeat from '" +

????????????? trackerName + "'; resending the previous 'lost' response");

????????? return prevHeartbeatResponse;

??????? }

????? }

??? }

??? ??

??? // Process this heartbeat

??? short newResponseId = (short)(responseId + 1);

??? status.setLastSeen(now);

??? if (!processHeartbeat(status, initialContact, now)) {

????? if (prevHeartbeatResponse != null) {

??????? trackerToHeartbeatResponseMap.remove(trackerName);

????? }

????? return new HeartbeatResponse(newResponseId,

?????????????????? new TaskTrackerAction[] {new ReinitTrackerAction()});

??? }

?????

??? // Initialize the response to be sent for the heartbeat

??? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

??? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

??? boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());

??? // Check for new tasks to be executed on the tasktracker

??? if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

????? TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);

????? if (taskTrackerStatus == null) {

??????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

????? } else {

??????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

??????? if (tasks == null ) {

????????? tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));

??????? }

??????? if (tasks != null) {

??????? ??for (Task task : tasks) {

??????????? expireLaunchingTasks.addNewTask(task.getTaskID());

??????????? if(LOG.isDebugEnabled()) {

????????????? LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());

??????????? }

??????????? actions.add(new LaunchTaskAction(task));

????????? }

??????? }

????? }

??? }

?????

??? // kill的任务的队列

??? List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);

??? if (killTasksList != null) {

????? actions.addAll(killTasksList);

??? }

????

??? // Check for jobs to be killed/cleanedup

??? List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);

??? if (killJobsList != null) {

????? actions.addAll(killJobsList);

??? }

?

??? // Check for tasks whose outputs can be saved

??? List<TaskTrackerAction> commitTasksList = getTasksToSave(status);

??? if (commitTasksList != null) {

????? actions.addAll(commitTasksList);

??? }

?

??? // calculate next heartbeat interval and put in heartbeat response

??? int nextInterval = getNextHeartbeatInterval();

??? response.setHeartbeatInterval(nextInterval);

??? response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));

???

??? // check if the restart info is req

??? if (addRestartInfo) {

????? response.setRecoveredJobs(recoveryManager.getJobsToRecover());

??? }

???????

??? // Update the trackerToHeartbeatResponseMap

??? trackerToHeartbeatResponseMap.put(trackerName, response);

?

??? // Done processing the hearbeat, now remove 'marked' tasks

??? removeMarkedTasks(trackerName);

???????

??? return response;

? }

?

之后启用调度器,默认的调度器为JobQueueTaskScheduler它的assignTasks方法如下:

public synchronized List<Task> assignTasks(TaskTracker taskTracker)

????? throws IOException {

??? TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();

?

??? //得到云端的状态;

??? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

??? final int numTaskTrackers = clusterStatus.getTaskTrackers();

??? final int clusterMapCapacity = clusterStatus.getMaxMapTasks();

??? final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

?

? ??Collection<JobInProgress> jobQueue =

????? jobQueueJobInProgressListener.getJobQueue();

?

??? //

??? // Get map + reduce counts for the current tracker.

??? //

??? //得到TaskTracker的状态

??? final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();

??? final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();

??? final int trackerRunningMaps = taskTrackerStatus.countMapTasks();

??? final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

?

??? // 分配任务

??? List<Task> assignedTasks = new ArrayList<Task>();

?

??? //

??? // Compute (running + pending) map and reduce task numbers across pool

??? //

??? int remainingReduceLoad = 0;

??? int remainingMapLoad = 0;

??? synchronized (jobQueue) {

????? for (JobInProgress job : jobQueue) {

??????? if (job.getStatus().getRunState() == JobStatus.RUNNING) {

????????? remainingMapLoad += (job.desiredMaps() - job.finishedMaps());

????????? if (job.scheduleReduces()) {

??????????? remainingReduceLoad +=

????????????? (job.desiredReduces() - job.finishedReduces());

????????? }

??????? }

????? }

??? }

?

??? // Compute the 'load factor' for maps and reduces

??? double mapLoadFactor = 0.0;

??? if (clusterMapCapacity > 0) {

????? mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;

??? }

? ??double reduceLoadFactor = 0.0;

??? if (clusterReduceCapacity > 0) {

????? reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;

??? }

???????

??

??? final int trackerCurrentMapCapacity =

????? Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),

????????????????????????????? trackerMapCapacity);

??? int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;

??? boolean exceededMapPadding = false;

??? if (availableMapSlots > 0) {

????? exceededMapPadding =

????? ??exceededPadding(true, clusterStatus, trackerMapCapacity);

??? }

???

??? int numLocalMaps = 0;

??? int numNonLocalMaps = 0;

??? scheduleMaps:

??? for (int i=0; i < availableMapSlots; ++i) {

????? synchronized (jobQueue) {

??????? for (JobInProgress job : jobQueue) {

????????? if (job.getStatus().getRunState() != JobStatus.RUNNING) {

??????????? continue;

????????? }

?

????????? Task t = null;

?????????

????????? // Try to schedule a node-local or rack-local Map task

????????? t =

??????????? job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,

????????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts());

????????? if (t != null) {

??????????? assignedTasks.add(t);

??????????? ++numLocalMaps;

???????????

??????????? // Don't assign map tasks to the hilt!

??????????? // Leave some free slots in the cluster for future task-failures,

??????????? // speculative tasks etc. beyond the highest priority job

??????????? if (exceededMapPadding) {

????????????? break scheduleMaps;

???? ???????}

??????????

??????????? // Try all jobs again for the next Map task

??????????? break;

????????? }

?????????

????????? // Try to schedule a node-local or rack-local Map task

????????? t =

??????????? job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,

?????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts());

?????????

????????? if (t != null) {

??????????? assignedTasks.add(t);

??????????? ++numNonLocalMaps;

???????????

??????????? // We assign at most 1 off-switch or speculative task

??????????? // This is to prevent TaskTrackers from stealing local-tasks

??????????? // from other TaskTrackers.

??????????? break scheduleMaps;

????????? }

??????? }

????? }

??? }

??? int assignedMaps = assignedTasks.size();

?

??? //

??? // Same thing, but for reduce tasks

??? // However we _never_ assign more than 1 reduce task per heartbeat

??? //

??? final int trackerCurrentReduceCapacity =

????? Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),

??????????? ???trackerReduceCapacity);

??? final int availableReduceSlots =

????? Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);

??? boolean exceededReducePadding = false;

??? if (availableReduceSlots > 0) {

????? exceededReducePadding = exceededPadding(false, clusterStatus,

????????????????????????????????????????????? trackerReduceCapacity);

????? synchronized (jobQueue) {

??????? for (JobInProgress job : jobQueue) {

????????? if (job.getStatus().getRunState() != JobStatus.RUNNING ||

???????? ?????job.numReduceTasks == 0) {

??????????? continue;

????????? }

?

????????? Task t =

??????????? job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,

??????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts()

??????????????? ????????????????????);

????????? if (t != null) {

??????????? assignedTasks.add(t);

??????????? break;

????????? }

?????????

????????? // Don't assign reduce tasks to the hilt!

????????? // Leave some free slots in the cluster for future task-failures,

? ????????// speculative tasks etc. beyond the highest priority job

????????? if (exceededReducePadding) {

??????????? break;

????????? }

??????? }

????? }

??? }

???

??? if (LOG.isDebugEnabled()) {

????? LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +

??????????????? "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +

??????????????? trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +

??????????????? (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +

??????????????? assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +

??????????????? ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +

??????????????? trackerCurrentReduceCapacity + "," + trackerRunningReduces +

????????? ??????"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +

??????????????? ", " + (assignedTasks.size()-assignedMaps) + "]");

??? }

?

??? return assignedTasks;

? }

当JobTracker接受到heartbeat后,如果JobTracker返回的response含有分配好的任务LaunchAction,TaskTracker则addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLauncher的队列中。MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例,它是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列,如果应答包中包含的任务是map task则放入mapLauncher队列,如果是reduce task则放入reduceLauncher的taskToLaunch队列:

private void addToTaskQueue(LaunchTaskAction action) {

??? if (action.getTask().isMapTask()) {

????? mapLauncher.addToTaskQueue(action);

??? } else {

????? reduceLauncher.addToTaskQueue(action);

??? }

? }

?

注册动作

private TaskInProgress registerTask(LaunchTaskAction action,

????? TaskLauncher launcher) {

??? Task t = action.getTask();

??? LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +

???????????? " task's state:" + t.getState());

??? TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);

??? synchronized (this) {

????? tasks.put(t.getTaskID(), tip);

????? runningTasks.put(t.getTaskID(), tip);

????? boolean isMap = t.isMapTask();

????? if (isMap) {

??????? mapTotal++;

????? } else {

??????? reduceTotal++;

? ????}

??? }

??? return tip;

? }

?

?

?private Path localizeJobConfFile(Path jobFile, String user,

????? FileSystem userFs, JobID jobId)

? throws IOException {

??? // Get sizes of JobFile and JarFile

??? // sizes are -1 if they are not present.

??? FileStatus status = null;

??? long jobFileSize = -1;

??? try {

????? status = userFs.getFileStatus(jobFile);

????? jobFileSize = status.getLen();

??? } catch(FileNotFoundException fe) {

????? jobFileSize = -1;

??? }

??? Path localJobFile =

????? lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,

????????? jobId.toString()), jobFileSize, fConf);

?

??? // Download job.xml

??? userFs.copyToLocalFile(jobFile, localJobFile);

??? return localJobFile;

? }

?这个方法进行了一些列的本地化的处理,将jar包,split文件以及xml文件拷贝到本地。当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:

/**

???? * Kick off the task execution

???? */

??? public synchronized void launchTask(RunningJob rjob) throws IOException {

????? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||

????????? this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||

????????? this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {

??????? localizeTask(task);

??????? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

????????? this.taskStatus.setRunState(TaskStatus.State.RUNNING);

??????? }

??????? setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));

??????? this.runner.start();

??????? long now = System.currentTimeMillis();

??????? this.taskStatus.setStartTime(now);

??????? this.lastProgressReport = now;

????? } else {

??????? LOG.info("Not launching task: " + task.getTaskID() +

??????????? " since it's state is " + this.taskStatus.getRunState());

????? }

}

之后就创建TaskRunner对象,运行任务。

public final void run() {

??? String errorInfo = "Child Error";

??? try {

?????

????? //before preparing the job localize

????? //all the archives

????? TaskAttemptID taskid = t.getTaskID();

????? final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

????? //simply get the location of the workDir and pass it to the child. The

????? //child will do the actual dir creation

????? final File workDir =

????? new File(new Path(localdirs[rand.nextInt(localdirs.length)],

???? ?????TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),

????????? taskid.toString(),

????????? t.isTaskCleanupTask())).toString());

?????

????? String user = tip.getUGI().getUserName();

?????

???

?

????? if (!prepare()) {

??????? return;

????? }

?????

????? // Accumulates class paths for child.

????? List<String> classPaths = getClassPaths(conf, workDir,

????????????????????????????????????????????? taskDistributedCacheManager);

?

????? long logSize = TaskLog.getTaskLogLength(conf);

? ????

????? //? Build exec child JVM args.

????? Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);

?????

????? tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);

?

????? // set memory limit using ulimit if feasible and necessary ...

????? String setup = getVMSetupCmd();

????? // Set up the redirection of the task's stdout and stderr streams

????? File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());

????? File stdout = logFiles[0];

????? File stderr = logFiles[1];

????? tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,

???????????????? stderr);

?????

????? Map<String, String> env = new HashMap<String, String>();

????? errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,

?????????????????????????????????? logSize);

?????

????? // flatten the env as a set of export commands

????? List <String> setupCmds = new ArrayList<String>();

????? for(Entry<String, String> entry : env.entrySet()) {

??????? StringBuffer sb = new StringBuffer();

??????? sb.append("export ");

??????? sb.append(entry.getKey());

??????? sb.append("="");

??????? sb.append(entry.getValue());

??????? sb.append(""");

??????? setupCmds.add(sb.toString());

????? }

????? setupCmds.add(setup);

?????

????? launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);

????? tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());

????? if (exitCodeSet) {

??????? if (!killed && exitCode != 0) {

????????? if (exitCode == 65) {

??????????? tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());

????????? }

????????? throw new IOException("Task process exit with nonzero status of " +

????????????? exitCode + ".");

??????? }

????? }

??? } catch (FSError e) {

????? LOG.fatal("FSError", e);

????? try {

??????? tracker.fsErrorInternal(t.getTaskID(), e.getMessage());

????? } catch (IOException ie) {

??????? LOG.fatal(t.getTaskID()+" reporting FSError", ie);

????? }

??? } catch (Throwable throwable) {

????? LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);

????? Throwable causeThrowable = new Throwable(errorInfo, throwable);

????? ByteArrayOutputStream baos = new ByteArrayOutputStream();

????? causeThrowable.printStackTrace(new PrintStream(baos));

????? try {

??????? tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());

????? } catch (IOException e) {

??????? LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);

????? }

??? } finally {

?????

????? // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with

????? // *false* since the task has either

????? // a) SUCCEEDED - which means commit has been done

????? // b) FAILED - which means we do not need to commit

????? tip.reportTaskFinished(false);

??? }

?
hadoop学习札记(一)——hadoop运行源代码分析
?

i like it! i like it!
哥哥,你眼光没搞错吧,写的这么烂的博客,我自己都不忍心看啊

热点排行