hadoop学习笔记(一)——hadoop运行源代码分析
?Hadoop运行流程分析源代码级
??? 前言:
??? 最近一直在分析hadoop的运行流程,我们查阅了大量的资料,虽然从感性上对这个流程有了一个认识但是我总是感觉对mapreduce的运行还是没有一个全面的认识,所以决定从源代码级别对mapreduce的运行流程做一个分析。
??? 前奏:
??? 首先从任务提交开始,如果我们使用的是job类的话那么提交任务的触发语句是
??????? job.waitForCompletion(true),true表示运行时打印运行的信息;
??? 在eclipse中我们按F3键可以发现这个方法的代码,这个方法实际是调用了job类的submit方法,而submit方法又调用submitJobInternal(conf)的方法提交任务,然后这个方法会将job的job.jar,job.split,job.xml三个文件上传倒hdfs文件系统,
ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
????? public RunningJob run() throws FileNotFoundException,
????? ClassNotFoundException,
????? InterruptedException,
????? IOException{
??????? JobConf jobCopy = job;
??????? Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
??????????? jobCopy);
??????? //得到job的ID;
??????? JobID jobId = jobSubmitClient.getNewJobId();
??? ? //上传Job的路径
??????? Path submitJobDir = new Path(jobStagingArea, jobId.toString());
??????? jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
??????? JobStatus status = null;
??????? try {
????????? populateTokenCache(jobCopy, jobCopy.getCredentials());
?
????????? copyAndConfigureFiles(jobCopy, submitJobDir);
??? ???
??? ??? //获得namenode的任务代理
????????? // get delegation token for the dir
????????? TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
????? ????????????????????????????????????????new Path [] {submitJobDir},
????????????????????????????????????????????? jobCopy);
?????? //得到job配置路径;
????????? Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
?????? //设置job的reduce的任务数目默认为1
????????? int reduces = jobCopy.getNumReduceTasks();
?????? //得到本地的IP
????????? InetAddress ip = InetAddress.getLocalHost();
????????? if (ip != null) {
??????????? job.setJobSubmitHostAddress(ip.getHostAddress());
??????????? job.setJobSubmitHostName(ip.getHostName());
????????? }
????????? JobContext context = new JobContext(jobCopy, jobId);
?
????????? jobCopy = (JobConf)context.getConfiguration();
?
????????? // Check the output specification
????????? if (reduces == 0 ? jobCopy.getUseNewMapper() :
??????????? jobCopy.getUseNewReducer()) {
??????????? org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
????????????? ReflectionUtils.newInstance(context.getOutputFormatClass(),
????????????????? jobCopy);
??????????? output.checkOutputSpecs(context);
????????? } else {
??????????? jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
????????? }
?
???? ????创建文件的split
????????? FileSystem fs = submitJobDir.getFileSystem(jobCopy);
????????? LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
????????? int maps = writeSplits(context, submitJobDir);
????????? jobCopy.setNumMapTasks(maps);
?
????????? // write "queue admins of the queue to which job is being submitted"
????????? // to job file.
????????? String queue = jobCopy.getQueueName();
????????? AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
????????? jobCopy.set(QueueManager.toFullPropertyName(queue,
????????????? QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
?
????????? // Write job file to JobTracker's fs???????
????????? FSDataOutputStream out =
??????????? FileSystem.create(fs, submitJobFile,
??????????????? new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
?
????????? try {
?????? ????? //写入xml文件
??????????? jobCopy.writeXml(out);
????????? } finally {
??????????? out.close();
????????? }
????????? //
????????? // Now, actually submit the job (using the submit name)
????????? //
????????? printTokens(jobId, jobCopy.getCredentials());
?????? //使用代理机制提交作业
????????? status = jobSubmitClient.submitJob(
????????????? jobId, submitJobDir.toString(), jobCopy.getCredentials());
????????? if (status != null) {
??????????? return new NetworkedJob(status);
????????? } else {
??????????? throw new IOException("Could not launch job");
????????? }
??????? } finally {
????????? if (status == null) {
??????????? LOG.info("Cleaning up the staging area " + submitJobDir);
??????????? if (fs != null && submitJobDir != null)
????????????? fs.delete(submitJobDir, true);
????????? }
??????? }
????? }
??? });
??? 查看这个方法我们发现在这里我们可以得到一个job的路径和job的id。然后使用代理机制提交作业。
任务提交完之后jobtracker会产生一个JobInProgress类的实例,这个类表示了job的各种信息,以及job所需要执行的各种动作。JobTracker收倒提交的数据之后就会根据job的配置tasktracker分配任务,当所有的tasktracker执行完之后才会通知jobclient任务完成。Jobtracker会将job加到一个队列中去这个队列叫jobInitQuene,然后在JobTracker有一个名为JobQueueTaskSchedule的对象,会轮询队列的每一个对象,一旦有新的job加入就将其取出,然后将其初始化。对于每个task还会有一个TaskInprogress对象与其对应。TaskTracker启动之后和JobTracker的通信机制是通过心跳机制。
TaskTracker每个三秒钟向JobTracker发送一个JobTracker发送一个HearBeat,HeartBeat中会有很多Taskracker的信息。JobTracker在收到Heartbeat之后,会检查该heartbeat的里所包含的各种信息,如果发现错误会启动相应的错误处理程序。如果 TaskTracker在Heartbeat中添加了对Task的请求,则JobTracker会添加相应的指令在对Heartbeat的回复中。在 Hadoop源代码中,JobTracker对TaskTracker的指令称为action,JobTracker对TaskTracker所发送来的 Heartbeat的回复消息称为HeartbeatResponse。
在TaskTracker内部,有一个队列叫做TaskQueue。该中包含了所有新加入的Task。每当TaskTracker收到 HeartbeatResponse后,会对其进行检查,如果其中包含了新的Task,便将其加入到TaskQueue中。在TaskTracker内 部,有两个线程不断轮询TaskQueue,一个是MapLauncher,另一个是ReduceLauncher。如果发现有新加入的Map任务,MapLauncher便将其取出并且执行。如果是Reduce任务,ReduceLauncher便将其取出执行。
不论是Map Task还是Reduce Task,当他们被取出之后,都要进行本地化。本地化的意思就是将所有需要的信息,比如需要运行的jar文件、配置文件、输入数据等等,一起拷贝到本地的文件系统。这样做的目的是为了方便任务在某台机器上独立执行。本地化之后,TaskTracker会为每一个task单独创建一个jvm,然后单独运行。 等Task运行完之后,TaskTracker会通知JobTracker任务完成,以进行下一步的动作。
等到所有的Task都完成之后,Job也就完成了,此时JobTracker会通知JobClient工作完成。
代码详解:
当我们在hadoop中bin/start-all.sh之后我们查看脚本会发现,它启动了三个脚本hadoop-config.sh,start-dfs.sh ,start-mapred.sh。Hadoop会根据一系列的配置启动JobTracker和TaskTracker。Master会根据SSH登录登录到slaves机器上启动tasktracker和datanode。
下面结合hadoop的源代码进行流程分析
先介绍JobTracker和TaskTracker
在每一个JobTracker对应一个org.apache.hadoop.mapred.JobTracker类,这个类主要负责任务的接受,调度以及对TaskTracker的监控,每个JobTracker类是作为一个单独的JVM来使用的。通过方法startTracker()方法启动一个JobTracker。源代码:
?
? /**
?? * Start the JobTracker with given configuration.
?? *
?? * The conf will be modified to reflect the actual ports on which
? ?* the JobTracker is up and running if the user passes the port as
?? * <code>zero</code>.
?? *??
?? * @param conf configuration for the JobTracker.
?? * @throws IOException
?? */
?public static JobTracker startTracker(JobConf conf, String identifier)
? throws IOException, InterruptedException {
???
??? DefaultMetricsSystem.initialize("JobTracker");
??? JobTracker result = null;
??? while (true) {
????? try {
??? ?? //初始化 JobTracker名为result
??????? result = new JobTracker(conf, identifier);
??????? result.taskScheduler.setTaskTrackerManager(result);
??????? break;
????? } catch (VersionMismatch e) {
??????? throw e;
????? } catch (BindException e) {
??????? throw e;
?? ???} catch (UnknownHostException e) {
??????? throw e;
????? } catch (AccessControlException ace) {
??????? // in case of jobtracker not having right access
??????? // bail out
??????? throw ace;
????? } catch (IOException e) {
??????? LOG.warn("Error starting tracker: " +
???????????????? StringUtils.stringifyException(e));
????? }
????? Thread.sleep(1000);
??? }
??? if (result != null) {
????? JobEndNotifier.startNotifier();
????? MBeans.register("JobTracker", "JobTrackerInfo", result);
??? }
??? return result;
? }
??? ?startTracker根据conf配置创建JobTracker对象,然后进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。
??? 还有一个重要的方法就是offerService方法
? /**
?? * Run forever
?? */
? public void offerService() throws InterruptedException, IOException {
??? // Prepare for recovery. This is done irrespective of the status of restart
??? // flag.
??? while (true) {
????? try {
??????? recoveryManager.updateRestartCount();
??????? break;
????? } catch (IOException ioe) {
??????? LOG.warn("Failed to initialize recovery manager. ", ioe);
??????? // wait for some time
??????? Thread.sleep(FS_ACCESS_RETRY_PERIOD);
??????? LOG.warn("Retrying...");
????? }
??? }
???
??? taskScheduler.start();
???
??? //? Start the recovery after starting the scheduler
??? try {
??? //恢复
????? recoveryManager.recover();
??? } catch (Throwable t) {
????? LOG.warn("Recovery manager crashed! Ignoring.", t);
??? }
??? // refresh the node list as the recovery manager might have added
??? // disallowed trackers
???
??? refreshHosts();
???
?? ?this.expireTrackersThread = new Thread(this.expireTrackers,
????????????????????????????????????????? "expireTrackers");
??? this.expireTrackersThread.start();
??? this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
??? this.retireJobsThread.start();
??? expireLaunchingTaskThread.start();
?
??? if (completedJobStatusStore.isActive()) {
????? completedJobsStoreThread = new Thread(completedJobStatusStore,
??????????????????????????????????????????? "completedjobsStore-housekeeper");
????? completedJobsStoreThread.start();
??? }
?
??? // start the inter-tracker server once the jt is ready
??? this.interTrackerServer.start();
???
??? synchronized (this) {
????? state = State.RUNNING;
??? }
??? LOG.info("Starting RUNNING");
???
??? this.interTrackerServer.join();
??? LOG.info("Stopped interTrackerServer");
? }
??? 这个方法会一直执行,这里虽然是个死循环但是它能不断的恢复,启动任务,通过这种方式首先调度。
该方法调用了taskSchedule对象的start()方法。该对象是JobTracker的数据成员,类型提供了一些列接口,使得JobTracker可以对所有提交的job进行初始化以及调度。但是该类型实际上是一个抽象类型,其真正的实现类为JobQueueTaskSchedule类。所以,taskSchedule.start()方法实际执行的是JobQueueSchedule的start方法;
? /**
?? * Lifecycle method to allow the scheduler to start any work in separate
?? * threads.
?? * @throws IOException
?? */
? public void start() throws IOException {
??? // do nothing
? }
?
??? //请看JobQueueSchedule的start方法;
??? ?public synchronized void start() throws IOException {
??? ?????? super.start();//什么都不做
??? //添加JobInProgressListener监听器
??? taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
??? //添加任务初始化监听器
??? eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
??? eagerTaskInitializationListener.start();//启动
??? taskTrackerManager.addJobInProgressListener(
??????? eagerTaskInitializationListener);
? }
??? JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听器jobQueueJobInProgressListener和eagerTaskInitializationListener。前者是 JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各 个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方 法,对job进行初始化。
??? 在看一下JobInProgress类的initTasks方法
? /**
?? * Construct the splits, etc.? This is invoked from an async
?? * thread so that split-computation doesn't block anyone.
?? */
? public synchronized void initTasks()
? throws IOException, KillInterruptedException {
??? if (tasksInited || isComplete()) {
????? return;
??? }
??? synchronized(jobInitKillStatus){
????? if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
??????? return;//已经启动或者任务被kill直接结束方法;
????? }
????? jobInitKillStatus.initStarted = true;
??? }
?
??? LOG.info("Initializing " + jobId);
??? final long startTimeFinal = this.startTime;
??? // log job info as the user running the job
try {
在这里执行job
??? userUGI.doAs(new PrivilegedExceptionAction<Object>() {
????? @Override
????? public Object run() throws Exception {
??????? JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile,
??????????? startTimeFinal, hasRestarted());
??????? return null;
????? }
??? });
??? } catch(InterruptedException ie) {
????? throw new IOException(ie);
??? }
???
??? // log the job priority
??? setPriority(this.priority);
???
??? //
??? // generate security keys needed by Tasks
??? //
??? generateAndStoreTokens();
???
??? //
??? // read input splits and create a map per a split
??? //
??? //读取每个spit
??? TaskSplitMetaInfo[] splits = createSplits(jobId);
??? if (numMapTasks != splits.length) {
????? throw new IOException("Number of maps in JobConf doesn't match number of " +
????? ???? "recieved splits for job " + jobId + "! " +
????? ???? "numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
??? }
??? numMapTasks = splits.length;
?
??? //map和reduce任务等待直到得到slots才开始执行;
??? jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
??? jobtracker.getInstrumentation().addWaitingReduces(getJobID(),numReduceTasks;
?
?
下面通过初始化map任务
??? maps = new TaskInProgress[numMapTasks];
??? for(int i=0; i < numMapTasks; ++i) {
????? inputLength += splits[i].getInputDataLength();
??? //初始化map任务;
????? maps[i] = new TaskInProgress(jobId, jobFile,
?????????????????????????????????? splits[i],
?????????????????????????????????? jobtracker, conf, this, i, numSlotsPerMap);
??? }
??? LOG.info("Input size for job " + jobId + " = " + inputLength
??????? + ". Number of splits = " + splits.length);
?
??? // Set localityWaitFactor before creating cache
??? localityWaitFactor =
????? conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
??? if (numMapTasks > 0) {
??? //创建
????? nonRunningMapCache = createCache(splits, maxLevel);
??? }
???????
??? // set the launch time
??? this.launchTime = jobtracker.getClock().getTime();
?
在这里创建reduce任务;
??? //
??? // Create reduce tasks
??? //
??? this.reduces = new TaskInProgress[numReduceTasks];
??? for (int i = 0; i < numReduceTasks; i++) {
????? reduces[i] = new TaskInProgress(jobId, jobFile,
????????????????????????????????????? numMapTasks, i,
????????????????????????????????????? jobtracker, conf, this, numSlotsPerReduce);
?? ???nonRunningReduces.add(reduces[i]);
??? }
?
??? ?
在这里计算启动reduce任务时最小的map任务数目
??? completedMapsForReduceSlowstart =
????? (int)Math.ceil(
????????? (conf.getFloat("mapred.reduce.slowstart.completed.maps",
???????????????????????? DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
?????????? numMapTasks));
???
估计所有的map输出的数目
??? resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
???
??? // create cleanup two cleanup tips, one map and one reduce.
??? cleanup = new TaskInProgress[2];
?
??? // cleanup map tip. This map doesn't use any splits. Just assign an empty
??? Split的信息.
??? TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
??? cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
??????????? jobtracker, conf, this, numMapTasks, 1);
??? cleanup[0].setJobCleanupTask();
?
??? // cleanup reduce tip.
??? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
?????????????????????? numReduceTasks, jobtracker, conf, this, 1);
??? cleanup[1].setJobCleanupTask();
?
??? // create two setup tips, one map and one reduce.
??? setup = new TaskInProgress[2];
?
??? // setup map tip. This map doesn't use any split. Just assign an empty
??? // split.
??? setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
??????????? jobtracker, conf, this, numMapTasks + 1, 1);
??? setup[0].setJobSetupTask();
?
??? // setup reduce tip.
??? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
?????????????????????? numReduceTasks + 1, jobtracker, conf, this, 1);
??? setup[1].setJobSetupTask();
???
??? synchronized(jobInitKillStatus){
????? jobInitKillStatus.initDone = true;
????? if(jobInitKillStatus.killed) {
??????? throw new KillInterruptedException("Job " + jobId + " killed in init");
????? }
??? }
???
??? tasksInited = true;
??? JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
???????????????????????????????? numMapTasks, numReduceTasks);
???
?? // Log the number of map and reduce tasks
?? LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
??????????? + " map tasks and " + numReduceTasks + " reduce tasks.");
? }
代码中我们发现maptask和reducetTask都是TaskInProgress的实例,并且Task的数目与map的数目保持一致。
?
?
接下来要创建datanode的tasktracker
在hadoop中每一个tasktracker对应一个org.apache.hadoop.mapred.TaskTracker类,这个类实现了tasktracker的各种功能。每一个TaskTracker也是组为一个单独的JVM来使用的。在hadoop脚本中对应bin/hadoop-daemon.sh start jobtracker
先来看一下TaskTracker的主函数:
? /**
?? * Start the TaskTracker, point toward the indicated JobTracker
?? */
? public static void main(String argv[]) throws Exception {
??? StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
??? if (argv.length != 0) {
????? System.out.println("usage: TaskTracker");
????? System.exit(-1);
??? }
??? try {
????? JobConf conf=new JobConf();
????? // enable the server to track time spent waiting on locks
????? //使用代理机制让服务器等待判断
????? ReflectionUtils.setContentionTracing
??????? (conf.getBoolean("tasktracker.contention.tracking", false));
????? DefaultMetricsSystem.initialize("TaskTracker");
?????? TaskTracker tt = new TaskTracker(conf);
????? MBeans.register("TaskTracker", "TaskTrackerInfo", tt);
??? //做重要的语句启动 TaskTracker
????? tt.run();
??? } catch (Throwable e) {
????? LOG.error("Can not start task tracker because "+
??????????????? StringUtils.stringifyException(e));
????? System.exit(-1);
??? }
? }
?在run方法中会启动方法offerService()方法:
??
? public void run() {
??? try {
????? getUserLogManager().start();
????? startCleanupThreads();
????? boolean denied = false;
????? while (running && !shuttingDown && !denied) {
??????? boolean staleState = false;
??????? try {
????????? // This while-loop attempts reconnects if we get network errors
????????? while (running && !staleState && !shuttingDown && !denied) {
??????????? try {
????????????? State osState = offerService();
????????????? if (osState == State.STALE) {
??????????????? staleState = true;
????????????? } else if (osState == State.DENIED) {
??????????????? denied = true;
????????????? }
??????????? } catch (Exception ex) {
????????????? if (!shuttingDown) {
??????????????? LOG.info("Lost connection to JobTracker [" +
???????????????????????? jobTrackAddr + "].? Retrying...", ex);
??????????????? try {
????????????????? Thread.sleep(5000);
??????????????? } catch (InterruptedException ie) {
??????????????? }
????????????? }
??????????? }
????????? }
??????? } finally {
????????? close();
??????? }
??????? if (shuttingDown) { return; }
??????? LOG.warn("Reinitializing local state");
??????? initialize();
????? }
????? if (denied) {
??????? shutdown();
????? }
??? } catch (IOException iex) {
????? LOG.error("Got fatal exception while reinitializing TaskTracker: " +
??????? ????????StringUtils.stringifyException(iex));
????? return;
??? }
??? catch (InterruptedException i) {
????? LOG.error("Got interrupted while reinitializing TaskTracker: " +
????????? i.getMessage());
????? return;
??? }
? }
?
再来看offerService()方法:
? State offerService() throws Exception {
??? //得到上次发送心跳的时间;
??? long lastHeartbeat = System.currentTimeMillis();
?
??? while (running && !shuttingDown) {
????? try {
??????? long now = System.currentTimeMillis();
???????
??????? // accelerate to account for multiple finished tasks up-front
??????? synchronized (finishedCount) {
????????? long remaining =
??????????? (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
????????? while (remaining > 0) {
??????????? // sleeps for the wait time or
??????????? // until there are *enough* empty slots to schedule tasks
??????????? finishedCount.wait(remaining);
+
??????????? // Recompute
??????????? now = System.currentTimeMillis();
??????????? remaining =
????????????? (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
????????? }
????????? // Reset count
????????? finishedCount.set(0);
??????? }
?
??????? // If the TaskTracker is just starting up:
??????? // 1. Verify the buildVersion
??????? // 2. Get the system directory & filesystem
??????? if(justInited) {
????????? String jobTrackerBV = jobClient.getBuildVersion();
????????? if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
??????????? String msg = "Shutting down. Incompatible buildVersion." +
?????? ?????"\nJobTracker's: " + jobTrackerBV +
??????????? "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
??????????? LOG.error(msg);
??????????? try {
????????????? jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
??????????? } catch(Exception e ) {
????????????? LOG.info("Problem reporting to jobtracker: " + e);
??????????? }
??????????? return State.DENIED;
????????? }
?????????
????????? String dir = jobClient.getSystemDir();
????????? if (dir == null) {
??????????? throw new IOException("Failed to get system directory");
????????? }
????????? systemDirectory = new Path(dir);
????????? systemFS = systemDirectory.getFileSystem(fConf);
??????? }
???????
??????? // Send the heartbeat and process the jobtracker's directives
??????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
?
??????? // Note the time when the heartbeat returned, use this to decide when to send the
??????? // next heartbeat??
??????? lastHeartbeat = System.currentTimeMillis();
???????
???????
??????? // Check if the map-event list needs purging
??????? Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
??????? if (jobs.size() > 0) {
????????? synchronized (this) {
??????????? // purge the local map events list
??????????? for (JobID job : jobs) {
?????? ???????RunningJob rjob;
????????????? synchronized (runningJobs) {
??????????????? rjob = runningJobs.get(job);?????????
??????????????? if (rjob != null) {
????????????????? synchronized (rjob) {
??????????????????? FetchStatus f = rjob.getFetchStatus();
??????????????????? if (f != null) {
????????????????????? f.reset();
??????????????????? }
????????????????? }
??????????????? }
????????????? }
??????????? }
?
??????????? // Mark the reducers in shuffle for rollback
??????????? synchronized (shouldReset) {
????????????? for (Map.Entry<TaskAttemptID, TaskInProgress> entry
?????????????????? : runningTasks.entrySet()) {
??????????????? if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
????????????????? this.shouldReset.add(entry.getKey());
??????????????? }
????????????? }
??????????? }
????????? }
??????? }
??????? ?
?在返回的心跳heartbeatResponse中有很多jobTracker的指令
??????? TaskTrackerAction[] actions = heartbeatResponse.getActions();
??????? if(LOG.isDebugEnabled()) {
????????? LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
??????????????????? heartbeatResponse.getResponseId() + " and " +
??????????????????? ((actions != null) ? actions.length : 0) + " actions");
??????? }
??????? if (reinitTaskTracker(actions)) {
????????? return State.STALE;
??????? }
???????????
??????? // resetting heartbeat interval from the response.
??????? heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
??????? justStarted = false;
??????? justInited = false;
?下面执行action
??????? if (actions != null){
????????? for(TaskTrackerAction action: actions) {
??????????? if (action instanceof LaunchTaskAction) {
????????????? 添加进队列
????????????? addToTaskQueue((LaunchTaskAction)action);
??????????? } else if (action instanceof CommitTaskAction) {
????????????? CommitTaskAction commitAction = (CommitTaskAction)action;
????????????? if (!commitResponses.contains(commitAction.getTaskID())) {
??????????????? LOG.info("Received commit task action for " +
????????????????????????? commitAction.getTaskID());
??????????????? commitResponses.add(commitAction.getTaskID());
????????????? }
??????????? } else {
????????????? tasksToCleanup.put(action);
??????????? }
????????? }
??????? }
??????? markUnresponsiveTasks();
??????? killOverflowingTasks();
???????????
??????? //we've cleaned up, resume normal operation
??????? if (!acceptNewTasks && isIdle()) {
????????? acceptNewTasks=true;
??????? }
??????? //The check below may not be required every iteration but we are
??????? //erring on the side of caution here. We have seen many cases where
??????? //the call to jetty's getLocalPort() returns different values at
??????? //different times. Being a real paranoid here.
??????? checkJettyPort(server.getPort());
????? } catch (InterruptedException ie) {
??????? LOG.info("Interrupted. Closing down.");
??????? return State.INTERRUPTED;
????? } catch (DiskErrorException de) {
??????? String msg = "Exiting task tracker for disk error:\n" +
????????? StringUtils.stringifyException(de);
??????? LOG.error(msg);
??????? synchronized (this) {
报告错误
????????? jobClient.reportTaskTrackerError(taskTrackerName,
?????????????????????????????????????????? "DiskErrorException", msg);
??????? }
??????? return State.STALE;
????? } catch (RemoteException re) {
??????? String reClass = re.getClassName();
??????? if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
????????? LOG.info("Tasktracker disallowed by JobTracker.");
????????? return State.DENIED;
??????? }
????? } catch (Exception except) {
??????? String msg = "Caught exception: " +
????????? StringUtils.stringifyException(except);
??????? LOG.error(msg);
????? }
??? }
?
??? return State.NORMAL;
? }
TaskTracker每个3秒钟会向JobTracker发送一个心跳。心跳机制采用了RPC代理机制实现通信。关于代理机制其实就是一个远程方法调用的机制,这个大家可以参考其他的资料。
?
?
接下来JobTracker接受心跳并向TaskTracker分配任务,当JobTracker接受到心跳的时候会调用方法:heartbeat(TaskTrackerStatus status,boolean initialContact, booleanacceptNewTasks, short responseId)返回值是一个HeartbeatResponse对象:
?
? ?
? public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
????????????????????????????????????????????????? boolean restarted,
????????????????????????????????????????????????? boolean initialContact,
??????????????????? ??????????????????????????????boolean acceptNewTasks,
????????????????????????????????????????????????? short responseId)
??? throws IOException {
??? if (LOG.isDebugEnabled()) {
????? LOG.debug("Got heartbeat from: " + status.getTrackerName() +
?????? ?????????" (restarted: " + restarted +
??????????????? " initialContact: " + initialContact +
??????????????? " acceptNewTasks: " + acceptNewTasks + ")" +
??????????????? " with responseId: " + responseId);
??? }
?
??? // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
??? if (!acceptTaskTracker(status)) {
????? throw new DisallowedTaskTrackerException(status);
??? }
?
??? // First check if the last heartbeat response got through
??? String trackerName = status.getTrackerName();
??? long now = clock.getTime();
??? if (restarted) {
????? faultyTrackers.markTrackerHealthy(status.getHost());
??? } else {
????? faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);
??? }
???
??? HeartbeatResponse prevHeartbeatResponse =
????? trackerToHeartbeatResponseMap.get(trackerName);
??? boolean addRestartInfo = false;
?
??? if (initialContact != true) {
????? // If this isn't the 'initial contact' from the tasktracker,
????? // there is something seriously wrong if the JobTracker has
????? // no record of the 'previous heartbeat'; if so, ask the
????? // tasktracker to re-initialize itself.
????? if (prevHeartbeatResponse == null) {
??????? // This is the first heartbeat from the old tracker to the newly
??????? // started JobTracker
??????? if (hasRestarted()) {
????????? addRestartInfo = true;
????????? // inform the recovery manager about this tracker joining back
????????? recoveryManager.unMarkTracker(trackerName);
??????? } else {
????????? // Jobtracker might have restarted but no recovery is needed
????????? // otherwise this code should not be reached
????????? LOG.warn("Serious problem, cannot find record of 'previous' " +
?????????????????? "heartbeat for '" + trackerName +
?????????????????? "'; reinitializing the tasktracker");
? ????????return new HeartbeatResponse(responseId,
????????????? new TaskTrackerAction[] {new ReinitTrackerAction()});
??????? }
?
????? } else {
???????????????
??????? // It is completely safe to not process a 'duplicate' heartbeat from a
??????? // {@link TaskTracker} since it resends the heartbeat when rpcs are
??????? // lost see {@link TaskTracker.transmitHeartbeat()};
??????? // acknowledge it by re-sending the previous response to let the
??????? // {@link TaskTracker} go forward.
??????? if (prevHeartbeatResponse.getResponseId() != responseId) {
????????? LOG.info("Ignoring 'duplicate' heartbeat from '" +
????????????? trackerName + "'; resending the previous 'lost' response");
????????? return prevHeartbeatResponse;
??????? }
????? }
??? }
??? ??
??? // Process this heartbeat
??? short newResponseId = (short)(responseId + 1);
??? status.setLastSeen(now);
??? if (!processHeartbeat(status, initialContact, now)) {
????? if (prevHeartbeatResponse != null) {
??????? trackerToHeartbeatResponseMap.remove(trackerName);
????? }
????? return new HeartbeatResponse(newResponseId,
?????????????????? new TaskTrackerAction[] {new ReinitTrackerAction()});
??? }
?????
??? // Initialize the response to be sent for the heartbeat
??? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
??? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
??? boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
??? // Check for new tasks to be executed on the tasktracker
??? if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
????? TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
????? if (taskTrackerStatus == null) {
??????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
????? } else {
??????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
??????? if (tasks == null ) {
????????? tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
??????? }
??????? if (tasks != null) {
??????? ??for (Task task : tasks) {
??????????? expireLaunchingTasks.addNewTask(task.getTaskID());
??????????? if(LOG.isDebugEnabled()) {
????????????? LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
??????????? }
??????????? actions.add(new LaunchTaskAction(task));
????????? }
??????? }
????? }
??? }
?????
??? // kill的任务的队列
??? List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
??? if (killTasksList != null) {
????? actions.addAll(killTasksList);
??? }
????
??? // Check for jobs to be killed/cleanedup
??? List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
??? if (killJobsList != null) {
????? actions.addAll(killJobsList);
??? }
?
??? // Check for tasks whose outputs can be saved
??? List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
??? if (commitTasksList != null) {
????? actions.addAll(commitTasksList);
??? }
?
??? // calculate next heartbeat interval and put in heartbeat response
??? int nextInterval = getNextHeartbeatInterval();
??? response.setHeartbeatInterval(nextInterval);
??? response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));
???
??? // check if the restart info is req
??? if (addRestartInfo) {
????? response.setRecoveredJobs(recoveryManager.getJobsToRecover());
??? }
???????
??? // Update the trackerToHeartbeatResponseMap
??? trackerToHeartbeatResponseMap.put(trackerName, response);
?
??? // Done processing the hearbeat, now remove 'marked' tasks
??? removeMarkedTasks(trackerName);
???????
??? return response;
? }
?
之后启用调度器,默认的调度器为JobQueueTaskScheduler它的assignTasks方法如下:
public synchronized List<Task> assignTasks(TaskTracker taskTracker)
????? throws IOException {
??? TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
?
??? //得到云端的状态;
??? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
??? final int numTaskTrackers = clusterStatus.getTaskTrackers();
??? final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
??? final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
?
? ??Collection<JobInProgress> jobQueue =
????? jobQueueJobInProgressListener.getJobQueue();
?
??? //
??? // Get map + reduce counts for the current tracker.
??? //
??? //得到TaskTracker的状态
??? final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
??? final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
??? final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
??? final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();
?
??? // 分配任务
??? List<Task> assignedTasks = new ArrayList<Task>();
?
??? //
??? // Compute (running + pending) map and reduce task numbers across pool
??? //
??? int remainingReduceLoad = 0;
??? int remainingMapLoad = 0;
??? synchronized (jobQueue) {
????? for (JobInProgress job : jobQueue) {
??????? if (job.getStatus().getRunState() == JobStatus.RUNNING) {
????????? remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
????????? if (job.scheduleReduces()) {
??????????? remainingReduceLoad +=
????????????? (job.desiredReduces() - job.finishedReduces());
????????? }
??????? }
????? }
??? }
?
??? // Compute the 'load factor' for maps and reduces
??? double mapLoadFactor = 0.0;
??? if (clusterMapCapacity > 0) {
????? mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
??? }
? ??double reduceLoadFactor = 0.0;
??? if (clusterReduceCapacity > 0) {
????? reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
??? }
???????
??
??? final int trackerCurrentMapCapacity =
????? Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
????????????????????????????? trackerMapCapacity);
??? int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
??? boolean exceededMapPadding = false;
??? if (availableMapSlots > 0) {
????? exceededMapPadding =
????? ??exceededPadding(true, clusterStatus, trackerMapCapacity);
??? }
???
??? int numLocalMaps = 0;
??? int numNonLocalMaps = 0;
??? scheduleMaps:
??? for (int i=0; i < availableMapSlots; ++i) {
????? synchronized (jobQueue) {
??????? for (JobInProgress job : jobQueue) {
????????? if (job.getStatus().getRunState() != JobStatus.RUNNING) {
??????????? continue;
????????? }
?
????????? Task t = null;
?????????
????????? // Try to schedule a node-local or rack-local Map task
????????? t =
??????????? job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
????????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts());
????????? if (t != null) {
??????????? assignedTasks.add(t);
??????????? ++numLocalMaps;
???????????
??????????? // Don't assign map tasks to the hilt!
??????????? // Leave some free slots in the cluster for future task-failures,
??????????? // speculative tasks etc. beyond the highest priority job
??????????? if (exceededMapPadding) {
????????????? break scheduleMaps;
???? ???????}
??????????
??????????? // Try all jobs again for the next Map task
??????????? break;
????????? }
?????????
????????? // Try to schedule a node-local or rack-local Map task
????????? t =
??????????? job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
?????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts());
?????????
????????? if (t != null) {
??????????? assignedTasks.add(t);
??????????? ++numNonLocalMaps;
???????????
??????????? // We assign at most 1 off-switch or speculative task
??????????? // This is to prevent TaskTrackers from stealing local-tasks
??????????? // from other TaskTrackers.
??????????? break scheduleMaps;
????????? }
??????? }
????? }
??? }
??? int assignedMaps = assignedTasks.size();
?
??? //
??? // Same thing, but for reduce tasks
??? // However we _never_ assign more than 1 reduce task per heartbeat
??? //
??? final int trackerCurrentReduceCapacity =
????? Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
??????????? ???trackerReduceCapacity);
??? final int availableReduceSlots =
????? Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
??? boolean exceededReducePadding = false;
??? if (availableReduceSlots > 0) {
????? exceededReducePadding = exceededPadding(false, clusterStatus,
????????????????????????????????????????????? trackerReduceCapacity);
????? synchronized (jobQueue) {
??????? for (JobInProgress job : jobQueue) {
????????? if (job.getStatus().getRunState() != JobStatus.RUNNING ||
???????? ?????job.numReduceTasks == 0) {
??????????? continue;
????????? }
?
????????? Task t =
??????????? job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
??????????????????????????????????? taskTrackerManager.getNumberOfUniqueHosts()
??????????????? ????????????????????);
????????? if (t != null) {
??????????? assignedTasks.add(t);
??????????? break;
????????? }
?????????
????????? // Don't assign reduce tasks to the hilt!
????????? // Leave some free slots in the cluster for future task-failures,
? ????????// speculative tasks etc. beyond the highest priority job
????????? if (exceededReducePadding) {
??????????? break;
????????? }
??????? }
????? }
??? }
???
??? if (LOG.isDebugEnabled()) {
????? LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
??????????????? "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
??????????????? trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
??????????????? (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
??????????????? assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
??????????????? ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
??????????????? trackerCurrentReduceCapacity + "," + trackerRunningReduces +
????????? ??????"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
??????????????? ", " + (assignedTasks.size()-assignedMaps) + "]");
??? }
?
??? return assignedTasks;
? }
当JobTracker接受到heartbeat后,如果JobTracker返回的response含有分配好的任务LaunchAction,TaskTracker则addToTaskQueue方法,将其加入TaskTracker类中MapLauncher或者ReduceLauncher对象的taskToLauncher的队列中。MapLauncher和ReduceLauncher对象均为TaskLauncher类的实例,它是TaskTracker类的一个内部类,具有一个数据成员,是TaskTracker.TaskInProgress类型的队列,如果应答包中包含的任务是map task则放入mapLauncher队列,如果是reduce task则放入reduceLauncher的taskToLaunch队列:
private void addToTaskQueue(LaunchTaskAction action) {
??? if (action.getTask().isMapTask()) {
????? mapLauncher.addToTaskQueue(action);
??? } else {
????? reduceLauncher.addToTaskQueue(action);
??? }
? }
?
注册动作
private TaskInProgress registerTask(LaunchTaskAction action,
????? TaskLauncher launcher) {
??? Task t = action.getTask();
??? LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
???????????? " task's state:" + t.getState());
??? TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
??? synchronized (this) {
????? tasks.put(t.getTaskID(), tip);
????? runningTasks.put(t.getTaskID(), tip);
????? boolean isMap = t.isMapTask();
????? if (isMap) {
??????? mapTotal++;
????? } else {
??????? reduceTotal++;
? ????}
??? }
??? return tip;
? }
?
?
?private Path localizeJobConfFile(Path jobFile, String user,
????? FileSystem userFs, JobID jobId)
? throws IOException {
??? // Get sizes of JobFile and JarFile
??? // sizes are -1 if they are not present.
??? FileStatus status = null;
??? long jobFileSize = -1;
??? try {
????? status = userFs.getFileStatus(jobFile);
????? jobFileSize = status.getLen();
??? } catch(FileNotFoundException fe) {
????? jobFileSize = -1;
??? }
??? Path localJobFile =
????? lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
????????? jobId.toString()), jobFileSize, fConf);
?
??? // Download job.xml
??? userFs.copyToLocalFile(jobFile, localJobFile);
??? return localJobFile;
? }
?这个方法进行了一些列的本地化的处理,将jar包,split文件以及xml文件拷贝到本地。当所有的task运行所需要的资源都拷贝到本地后,则调用TaskTracker的launchTaskForJob方法,其又调用TaskTracker.TaskInProgress的launchTask函数:
/**
???? * Kick off the task execution
???? */
??? public synchronized void launchTask(RunningJob rjob) throws IOException {
????? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
????????? this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
????????? this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
??????? localizeTask(task);
??????? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
????????? this.taskStatus.setRunState(TaskStatus.State.RUNNING);
??????? }
??????? setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
??????? this.runner.start();
??????? long now = System.currentTimeMillis();
??????? this.taskStatus.setStartTime(now);
??????? this.lastProgressReport = now;
????? } else {
??????? LOG.info("Not launching task: " + task.getTaskID() +
??????????? " since it's state is " + this.taskStatus.getRunState());
????? }
}
之后就创建TaskRunner对象,运行任务。
public final void run() {
??? String errorInfo = "Child Error";
??? try {
?????
????? //before preparing the job localize
????? //all the archives
????? TaskAttemptID taskid = t.getTaskID();
????? final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
????? //simply get the location of the workDir and pass it to the child. The
????? //child will do the actual dir creation
????? final File workDir =
????? new File(new Path(localdirs[rand.nextInt(localdirs.length)],
???? ?????TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
????????? taskid.toString(),
????????? t.isTaskCleanupTask())).toString());
?????
????? String user = tip.getUGI().getUserName();
?????
???
?
????? if (!prepare()) {
??????? return;
????? }
?????
????? // Accumulates class paths for child.
????? List<String> classPaths = getClassPaths(conf, workDir,
????????????????????????????????????????????? taskDistributedCacheManager);
?
????? long logSize = TaskLog.getTaskLogLength(conf);
? ????
????? //? Build exec child JVM args.
????? Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
?????
????? tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
?
????? // set memory limit using ulimit if feasible and necessary ...
????? String setup = getVMSetupCmd();
????? // Set up the redirection of the task's stdout and stderr streams
????? File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
????? File stdout = logFiles[0];
????? File stderr = logFiles[1];
????? tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
???????????????? stderr);
?????
????? Map<String, String> env = new HashMap<String, String>();
????? errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
?????????????????????????????????? logSize);
?????
????? // flatten the env as a set of export commands
????? List <String> setupCmds = new ArrayList<String>();
????? for(Entry<String, String> entry : env.entrySet()) {
??????? StringBuffer sb = new StringBuffer();
??????? sb.append("export ");
??????? sb.append(entry.getKey());
??????? sb.append("="");
??????? sb.append(entry.getValue());
??????? sb.append(""");
??????? setupCmds.add(sb.toString());
????? }
????? setupCmds.add(setup);
?????
????? launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
????? tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
????? if (exitCodeSet) {
??????? if (!killed && exitCode != 0) {
????????? if (exitCode == 65) {
??????????? tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
????????? }
????????? throw new IOException("Task process exit with nonzero status of " +
????????????? exitCode + ".");
??????? }
????? }
??? } catch (FSError e) {
????? LOG.fatal("FSError", e);
????? try {
??????? tracker.fsErrorInternal(t.getTaskID(), e.getMessage());
????? } catch (IOException ie) {
??????? LOG.fatal(t.getTaskID()+" reporting FSError", ie);
????? }
??? } catch (Throwable throwable) {
????? LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
????? Throwable causeThrowable = new Throwable(errorInfo, throwable);
????? ByteArrayOutputStream baos = new ByteArrayOutputStream();
????? causeThrowable.printStackTrace(new PrintStream(baos));
????? try {
??????? tracker.reportDiagnosticInfoInternal(t.getTaskID(), baos.toString());
????? } catch (IOException e) {
??????? LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
????? }
??? } finally {
?????
????? // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
????? // *false* since the task has either
????? // a) SUCCEEDED - which means commit has been done
????? // b) FAILED - which means we do not need to commit
????? tip.reportTaskFinished(false);
??? }
?
?