首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Hadoop学习小结:Map-Reduce的过程解析

2012-08-30 
Hadoop学习总结:Map-Reduce的过程解析?一、客户端Map-Reduce的过程首先是由客户端提交一个任务开始的。提交

Hadoop学习总结:Map-Reduce的过程解析

?

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {

? //首先生成一个JobClient对象

? JobClient jc = new JobClient(job);

? ……

? //调用submitJob来提交一个任务

? running = jc.submitJob(job);

? JobID jobId = running.getID();

? ……

? while (true) {

???? //while循环中不断得到此任务的状态,并打印到客户端console中

? }

? return running;

}

其中JobClient的submitJob函数实现如下:

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

??????????????????????????????? InvalidJobConfException, IOException {

? //从JobTracker得到当前任务的id

? JobID jobId = jobSubmitClient.getNewJobId();

? //准备将任务运行所需要的要素写入HDFS:

? //任务运行程序所在的jar封装成job.jar

? //任务所要处理的input split信息写入job.split

? //任务运行的配置项汇总写入job.xml

? Path submitJobDir = new Path(getSystemDir(), jobId.toString());

? Path submitJarFile = new Path(submitJobDir, "job.jar");

? Path submitSplitFile = new Path(submitJobDir, "job.split");

? //此处将-libjars命令行指定的jar上传至HDFS

? configureCommandLineOptions(job, submitJobDir, submitJarFile);

? Path submitJobFile = new Path(submitJobDir, "job.xml");

? ……

? //通过input format的格式获得相应的input split,默认类型为FileSplit

? InputSplit[] splits =

??? job.getInputFormat().getSplits(job, job.getNumMapTasks());

?

? // 生成一个写入流,将input split得信息写入job.split文件

? FSDataOutputStream out = FileSystem.create(fs,

????? submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

? try {

??? //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个

input split的信息。

??? //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于

FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。

??? writeSplitsFile(splits, out);

? } finally {

??? out.close();

? }

? job.set("mapred.job.split.file", submitSplitFile.toString());

? //根据split的个数设定map task的个数

? job.setNumMapTasks(splits.length);

? // 写入job的配置信息入job.xml文件??????

? out = FileSystem.create(fs, submitJobFile,

????? new FsPermission(JOB_FILE_PERMISSION));

? try {

??? job.writeXml(out);

? } finally {

??? out.close();

? }

? //真正的调用JobTracker来提交任务

? JobStatus status = jobSubmitClient.submitJob(jobId);

? ……

}

?

二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

    调用静态函数startTracker(new JobConf())创建一个JobTracker对象调用JobTracker.offerService()函数提供服务

    在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

    在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

      JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

      EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

      在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:

      synchronized (jobs) {

      ? synchronized (taskScheduler) {

      ??? jobs.put(job.getProfile().getJobID(), job);

      ??? //对JobTracker的每一个listener都调用jobAdded函数

      ??? for (JobInProgressListener listener : jobInProgressListeners) {

      ????? listener.jobAdded(job);

      ??? }

      ? }

      }

      ?

      EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

      ?

      public synchronized void initTasks() throws IOException {

      ? ……

      ? //从HDFS中读取job.split文件从而生成input splits

      ? String jobFile = profile.getJobFile();

      ? Path sysDir = new Path(this.jobtracker.getSystemDir());

      ? FileSystem fs = sysDir.getFileSystem(conf);

      ? DataInputStream splitFile =

      ??? fs.open(new Path(conf.get("mapred.job.split.file")));

      ? JobClient.RawSplit[] splits;

      ? try {

      ??? splits = JobClient.readSplitFile(splitFile);

      ? } finally {

      ??? splitFile.close();

      ? }

      ? //map task的个数就是input split的个数

      ? numMapTasks = splits.length;

      ? //为每个map tasks生成一个TaskInProgress来处理一个input split

      ? maps = new TaskInProgress[numMapTasks];

      ? for(int i=0; i < numMapTasks; ++i) {

      ??? inputLength += splits[i].getDataLength();

      ??? maps[i] = new TaskInProgress(jobId, jobFile,

      ???????????????????????????????? splits[i],

      ???????????????????????????????? jobtracker, conf, this, i);

      ? }

      ? //对于map task,将其放入nonRunningMapCache,是一个

      Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在

      的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

      ? if (numMapTasks > 0) {?
      ??? nonRunningMapCache = createCache(splits, maxLevel);
      ? }

      ?

      ? //创建reduce task

      ? this.reduces = new TaskInProgress[numReduceTasks];

      ? for (int i = 0; i < numReduceTasks; i++) {

      ??? reduces[i] = new TaskInProgress(jobId, jobFile,

      ??????????????????????????????????? numMapTasks, i,

      ??????????????????????????????????? jobtracker, conf, this);

      ??? //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task

      的时候使用。

      ??? nonRunningReduces.add(reduces[i]);

      ? }

      ?

      ? //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

      ? cleanup = new TaskInProgress[2];

      ? cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

      ????????? jobtracker, conf, this, numMapTasks);

      ? cleanup[0].setJobCleanupTask();

      ? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

      ???????????????????? numReduceTasks, jobtracker, conf, this);

      ? cleanup[1].setJobCleanupTask();

      ? //创建两个初始化 task,一个初始化map,一个初始化reduce.

      ? setup = new TaskInProgress[2];

      ? setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

      ????????? jobtracker, conf, this, numMapTasks + 1 );

      ? setup[0].setJobSetupTask();

      ? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

      ???????????????????? numReduceTasks + 1, jobtracker, conf, this);

      ? setup[1].setJobSetupTask();

      ? tasksInited.set(true);//初始化完毕

      ? ……

      }

      ?

      三、TaskTracker

      TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

      ?

      State offerService() throws Exception {

      ? long lastHeartbeat = 0;

      ? //TaskTracker进行是一直存在的

      ? while (running && !shuttingDown) {

      ????? ……

      ????? long now = System.currentTimeMillis();

      ????? //每隔一段时间就向JobTracker发送heartbeat

      ????? long waitTime = heartbeatInterval - (now - lastHeartbeat);

      ????? if (waitTime > 0) {

      ??????? synchronized(finishedCount) {

      ????????? if (finishedCount[0] == 0) {

      ??????????? finishedCount.wait(waitTime);

      ????????? }

      ????????? finishedCount[0] = 0;

      ??????? }

      ????? }

      ????? ……

      ????? //发送Heartbeat到JobTracker,得到response

      ????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

      ????? ……

      ???? //从Response中得到此TaskTracker需要做的事情

      ????? TaskTrackerAction[] actions = heartbeatResponse.getActions();

      ????? ……

      ????? if (actions != null){

      ??????? for(TaskTrackerAction action: actions) {

      ????????? if (action instanceof LaunchTaskAction) {

      ??????????? //如果是运行一个新的Task,则将Action添加到任务队列中

      ??????????? addToTaskQueue((LaunchTaskAction)action);

      ????????? } else if (action instanceof CommitTaskAction) {

      ??????????? CommitTaskAction commitAction = (CommitTaskAction)action;

      ??????????? if (!commitResponses.contains(commitAction.getTaskID())) {

      ????????????? commitResponses.add(commitAction.getTaskID());

      ??????????? }

      ????????? } else {

      ??????????? tasksToCleanup.put(action);

      ????????? }

      ??????? }

      ????? }

      ? }

      ? return State.NORMAL;

      }

      其中transmitHeartBeat主要逻辑如下:

      ?

      private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

      ? //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

      ? boolean sendCounters;

      ? if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

      ??? sendCounters = true;

      ??? previousUpdate = now;

      ? }

      ? else {

      ??? sendCounters = false;

      ? }

      ? ……

      ? //报告给JobTracker,此TaskTracker的当前状态

      ? if (status == null) {

      ??? synchronized (this) {

      ????? status = new TaskTrackerStatus(taskTrackerName, localHostname,

      ???????????????????????????????????? httpPort,

      ???????????????????????????????????? cloneAndResetRunningTaskStatuses(

      ?????????????????????????????????????? sendCounters),

      ???????????????????????????????????? failures,

      ???????????????????????????????????? maxCurrentMapTasks,

      ???????????????????????????????????? maxCurrentReduceTasks);

      ??? }

      ? }

      ? ……

      ? //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

      ? //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

      ? //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

      ? boolean askForNewTask;

      ? long localMinSpaceStart;

      ? synchronized (this) {

      ??? askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

      ???????????????????? status.countReduceTasks() < maxCurrentReduceTasks) &&

      ??????????????????? acceptNewTasks;

      ??? localMinSpaceStart = minSpaceStart;

      ? }

      ? ……

      ? //向JobTracker发送heartbeat,这是一个RPC调用

      ? HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

      ??????????????????????????????????????????????????????????? justStarted, askForNewTask,

      ??????????????????????????????????????????????????????????? heartbeatResponseId);

      ? ……

      ? return heartbeatResponse;

      }

      ?

      四、JobTracker

      当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

      ?

      public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

      ??????????????????????????????????????????????? boolean initialContact, boolean acceptNewTasks, short responseId)

      ? throws IOException {

      ? ……

      ? String trackerName = status.getTrackerName();

      ? ……

      ? short newResponseId = (short)(responseId + 1);

      ? ……

      ? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

      ? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

      ? //如果TaskTracker向JobTracker请求一个task运行

      ? if (acceptNewTasks) {

      ??? TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

      ??? if (taskTrackerStatus == null) {

      ????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

      ??? } else {

      ????? //setup和cleanup的task优先级最高

      ????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

      ????? if (tasks == null ) {

      ??????? //任务调度器分配任务

      ??????? tasks = taskScheduler.assignTasks(taskTrackerStatus);

      ????? }

      ????? if (tasks != null) {

      ??????? for (Task task : tasks) {

      ????????? //将任务放入actions列表,返回给TaskTracker

      ????????? expireLaunchingTasks.addNewTask(task.getTaskID());

      ????????? actions.add(new LaunchTaskAction(task));

      ??????? }

      ????? }

      ??? }

      ? }

      ? ……

      ? int nextInterval = getNextHeartbeatInterval();

      ? response.setHeartbeatInterval(nextInterval);

      ? response.setActions(

      ????????????????????? actions.toArray(new TaskTrackerAction[actions.size()]));

      ? ……

      ? return response;

      }

      默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

      ?

      public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

      ??? throws IOException {

      ? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

      ? int numTaskTrackers = clusterStatus.getTaskTrackers();

      ? Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

      ? int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

      ? int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

      ? int numMaps = taskTracker.countMapTasks();

      ? int numReduces = taskTracker.countReduceTasks();

      ? //计算剩余的map和reduce的工作量:remaining

      ? int remainingReduceLoad = 0;

      ? int remainingMapLoad = 0;

      ? synchronized (jobQueue) {

      ??? for (JobInProgress job : jobQueue) {

      ????? if (job.getStatus().getRunState() == JobStatus.RUNNING) {

      ??????? int totalMapTasks = job.desiredMaps();

      ??????? int totalReduceTasks = job.desiredReduces();

      ??????? remainingMapLoad += (totalMapTasks - job.finishedMaps());

      ??????? remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

      ????? }

      ??? }

      ? }

      ? //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以

      TaskTracker的个数。

      ? int maxMapLoad = 0;

      ? int maxReduceLoad = 0;

      ? if (numTaskTrackers > 0) {

      ??? maxMapLoad = Math.min(maxCurrentMapTasks,

      ????????????????????????? (int) Math.ceil((double) remainingMapLoad /

      ????????????????????????????????????????? numTaskTrackers));

      ??? maxReduceLoad = Math.min(maxCurrentReduceTasks,

      ???????????????????????????? (int) Math.ceil((double) remainingReduceLoad

      ???????????????????????????????????????????? / numTaskTrackers));

      ? }

      ? ……

      ?

      ? //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map

      task

      ? if (numMaps < maxMapLoad) {

      ??? int totalNeededMaps = 0;

      ??? synchronized (jobQueue) {

      ????? for (JobInProgress job : jobQueue) {

      ??????? if (job.getStatus().getRunState() != JobStatus.RUNNING) {

      ????????? continue;

      ??????? }

      ??????? Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

      ??????????? taskTrackerManager.getNumberOfUniqueHosts());

      ??????? if (t != null) {

      ????????? return Collections.singletonList(t);

      ??????? }

      ??????? ……

      ????? }

      ??? }

      ? }

      ? //分配完map task,再分配reduce task

      ? if (numReduces < maxReduceLoad) {

      ??? int totalNeededReduces = 0;

      ??? synchronized (jobQueue) {

      ????? for (JobInProgress job : jobQueue) {

      ??????? if (job.getStatus().getRunState() != JobStatus.RUNNING ||

      ??????????? job.numReduceTasks == 0) {

      ????????? continue;

      ??????? }

      ??????? Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

      ??????????? taskTrackerManager.getNumberOfUniqueHosts());

      ??????? if (t != null) {

      ????????? return Collections.singletonList(t);

      ??????? }

      ??????? ……

      ????? }

      ??? }

      ? }

      ? return null;

      }

      从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

      ?

      五、TaskTracker

      在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):

      private void addToTaskQueue(LaunchTaskAction action) {

      ? if (action.getTask().isMapTask()) {

      ??? mapLauncher.addToTaskQueue(action);

      ? } else {

      ??? reduceLauncher.addToTaskQueue(action);

      ? }

      }

      TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):

      ?

      private void localizeJob(TaskInProgress tip) throws IOException {

      ? //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,

      job.xml以及job.jar

      ? Path localJarFile = null;

      ? Task t = tip.getTask();

      ? JobID jobId = t.getJobID();

      ? Path jobFile = new Path(t.getJobFile());

      ? ……

      ? Path localJobFile = lDirAlloc.getLocalPathForWrite(

      ????????????????????????????????? getLocalJobDir(jobId.toString())

      ????????????????????????????????? + Path.SEPARATOR + "job.xml",

      ????????????????????????????????? jobFileSize, fConf);

      ? RunningJob rjob = addTaskToJob(jobId, tip);

      ? synchronized (rjob) {

      ??? if (!rjob.localized) {

      ????? FileSystem localFs = FileSystem.getLocal(fConf);

      ????? Path jobDir = localJobFile.getParent();

      ????? ……

      ????? //将job.split拷贝到本地

      ????? systemFS.copyToLocalFile(jobFile, localJobFile);

      ????? JobConf localJobConf = new JobConf(localJobFile);

      ????? Path workDir = lDirAlloc.getLocalPathForWrite(

      ?????????????????????? (getLocalJobDir(jobId.toString())

      ?????????????????????? + Path.SEPARATOR + "work"), fConf);

      ????? if (!localFs.mkdirs(workDir)) {

      ??????? throw new IOException("Mkdirs failed to create "

      ??????????????????? + workDir.toString());

      ????? }

      ????? System.setProperty("job.local.dir", workDir.toString());

      ????? localJobConf.set("job.local.dir", workDir.toString());

      ????? // copy Jar file to the local FS and unjar it.

      ????? String jarFile = localJobConf.getJar();

      ????? long jarFileSize = -1;

      ????? if (jarFile != null) {

      ??????? Path jarFilePath = new Path(jarFile);

      ??????? localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

      ?????????????????????????????????? getLocalJobDir(jobId.toString())

      ?????????????????????????????????? + Path.SEPARATOR + "jars",

      ?????????????????????????????????? 5 * jarFileSize, fConf), "job.jar");

      ??????? if (!localFs.mkdirs(localJarFile.getParent())) {

      ????????? throw new IOException("Mkdirs failed to create jars directory ");

      ??????? }

      ??????? //将job.jar拷贝到本地

      ??????? systemFS.copyToLocalFile(jarFilePath, localJarFile);

      ??????? localJobConf.setJar(localJarFile.toString());

      ?????? //将job得configuration写成job.xml

      ??????? OutputStream out = localFs.create(localJobFile);

      ??????? try {

      ????????? localJobConf.writeXml(out);

      ??????? } finally {

      ????????? out.close();

      ??????? }

      ??????? // 解压缩job.jar

      ??????? RunJar.unJar(new File(localJarFile.toString()),

      ???????????????????? new File(localJarFile.getParent().toString()));

      ????? }

      ????? rjob.localized = true;

      ????? rjob.jobConf = localJobConf;

      ??? }

      ? }

      ? //真正的启动此Task

      ? launchTaskForJob(tip, new JobConf(rjob.jobConf));

      }

      当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:

      public synchronized void launchTask() throws IOException {

      ??? ……

      ??? //创建task运行目录

      ??? localizeTask(task);

      ??? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

      ????? this.taskStatus.setRunState(TaskStatus.State.RUNNING);

      ??? }

      ??? //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建

      的是ReduceTaskRunner

      ??? this.runner = task.createRunner(TaskTracker.this, this);

      ??? this.runner.start();

      ??? this.taskStatus.setStartTime(System.currentTimeMillis());

      }

      TaskRunner是一个线程,其run函数如下:

      ?

      public final void run() {

      ??? ……

      ??? TaskAttemptID taskid = t.getTaskID();

      ??? LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

      ??? File jobCacheDir = null;

      ??? if (conf.getJar() != null) {

      ????? jobCacheDir = new File(

      ??????????????????????? new Path(conf.getJar()).getParent().toString());

      ??? }

      ??? File workDir = new File(lDirAlloc.getLocalPathToRead(

      ????????????????????????????? TaskTracker.getLocalTaskDir(

      ??????????????????????????????? t.getJobID().toString(),

      ??????????????????????????????? t.getTaskID().toString(),

      ??????????????????????????????? t.isTaskCleanupTask())

      ????????????????????????????? + Path.SEPARATOR + MRConstants.WORKDIR,

      ????????????????????????????? conf). toString());

      ??? FileSystem fileSystem;

      ??? Path localPath;

      ??? ……

      ??? //拼写classpath

      ??? String baseDir;

      ??? String sep = System.getProperty("path.separator");

      ??? StringBuffer classPath = new StringBuffer();

      ??? // start with same classpath as parent process

      ??? classPath.append(System.getProperty("java.class.path"));

      ??? classPath.append(sep);

      ??? if (!workDir.mkdirs()) {

      ????? if (!workDir.isDirectory()) {

      ??????? LOG.fatal("Mkdirs failed to create " + workDir.toString());

      ????? }

      ??? }

      ??? String jar = conf.getJar();

      ??? if (jar != null) {??????

      ????? // if jar exists, it into workDir

      ????? File[] libs = new File(jobCacheDir, "lib").listFiles();

      ????? if (libs != null) {

      ??????? for (int i = 0; i < libs.length; i++) {

      ????????? classPath.append(sep);??????????? // add libs from jar to classpath

      ????????? classPath.append(libs[i]);

      ??????? }

      ????? }

      ????? classPath.append(sep);

      ????? classPath.append(new File(jobCacheDir, "classes"));

      ????? classPath.append(sep);

      ????? classPath.append(jobCacheDir);

      ??? }

      ??? ……

      ??? classPath.append(sep);

      ??? classPath.append(workDir);

      ??? //拼写命令行java及其参数

      ??? Vector<String> vargs = new Vector<String>(8);

      ??? File jvm =

      ????? new File(new File(System.getProperty("java.home"), "bin"), "java");

      ??? vargs.add(jvm.toString());

      ??? String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

      ??? javaOpts = javaOpts.replace("@taskid@", taskid.toString());

      ??? String [] javaOptsSplit = javaOpts.split(" ");

      ??? String libraryPath = System.getProperty("java.library.path");

      ??? if (libraryPath == null) {

      ????? libraryPath = workDir.getAbsolutePath();

      ??? } else {

      ????? libraryPath += sep + workDir;

      ??? }

      ??? boolean hasUserLDPath = false;

      ??? for(int i=0; i<javaOptsSplit.length ;i++) {

      ????? if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

      ??????? javaOptsSplit[i] += sep + libraryPath;

      ??????? hasUserLDPath = true;

      ??????? break;

      ????? }

      ??? }

      ??? if(!hasUserLDPath) {

      ????? vargs.add("-Djava.library.path=" + libraryPath);

      ??? }

      ??? for (int i = 0; i < javaOptsSplit.length; i++) {

      ????? vargs.add(javaOptsSplit[i]);

      ??? }

      ??? //添加Child进程的临时文件夹

      ??? String tmp = conf.get("mapred.child.tmp", "./tmp");

      ??? Path tmpDir = new Path(tmp);

      ??? if (!tmpDir.isAbsolute()) {

      ????? tmpDir = new Path(workDir.toString(), tmp);

      ??? }

      ??? FileSystem localFs = FileSystem.getLocal(conf);

      ??? if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

      ????? throw new IOException("Mkdirs failed to create " + tmpDir.toString());

      ??? }

      ??? vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

      ??? // Add classpath.

      ??? vargs.add("-classpath");

      ??? vargs.add(classPath.toString());

      ??? //log文件夹

      ??? long logSize = TaskLog.getTaskLogLength(conf);

      ??? vargs.add("-Dhadoop.log.dir=" +

      ??????? new File(System.getProperty("hadoop.log.dir")

      ??????? ).getAbsolutePath());

      ??? vargs.add("-Dhadoop.root.logger=INFO,TLA");

      ??? vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

      ??? vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

      ??? // 运行map task和reduce task的子进程的main class是Child

      ??? vargs.add(Child.class.getName());? // main of Child

      ??? ……

      ??? //运行子进程

      ??? jvmManager.launchJvm(this,

      ??????? jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

      ??????????? workDir, env, pidFile, conf));

      }

      ?

      六、Child

      真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:

      ?

      while (true) {

      ? //从TaskTracker通过网络通信得到JvmTask对象

      ? JvmTask myTask = umbilical.getTask(jvmId);

      ? ……

      ? idleLoopCount = 0;

      ? task = myTask.getTask();

      ? taskid = task.getTaskID();

      ? isCleanup = task.isTaskCleanupTask();

      ? JobConf job = new JobConf(task.getJobFile());

      ? TaskRunner.setupWorkDir(job);

      ? numTasksToExecute = job.getNumTasksToExecutePerJvm();

      ? task.setConf(job);

      ? defaultConf.addResource(new Path(task.getJobFile()));

      ? ……

      ? //运行task

      ? task.run(job, umbilical);???????????? // run the task

      ? if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {

      ??? break;

      ? }

      }

      6.1、MapTask

      如果task是MapTask,则其run函数如下:

      ?

      public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

      ? throws IOException {

      ? //用于同TaskTracker进行通信,汇报运行状况

      ? final Reporter reporter = getReporter(umbilical);

      ? startCommunicationThread(umbilical);

      ? initialize(job, reporter);

      ? ……

      ? //map task的输出

      ? int numReduceTasks = conf.getNumReduceTasks();

      ? MapOutputCollector collector = null;

      ? if (numReduceTasks > 0) {

      ??? collector = new MapOutputBuffer(umbilical, job, reporter);

      ? } else {

      ??? collector = new DirectMapOutputCollector(umbilical, job, reporter);

      ? }

      ? //读取input split,按照其中的信息,生成RecordReader来读取数据

      instantiatedSplit = (InputSplit)

      ????? ReflectionUtils.newInstance(job.getClassByName(splitClass), job);

      ? DataInputBuffer splitBuffer = new DataInputBuffer();

      ? splitBuffer.reset(split.getBytes(), 0, split.getLength());

      ? instantiatedSplit.readFields(splitBuffer);

      ? if (instantiatedSplit instanceof FileSplit) {

      ??? FileSplit fileSplit = (FileSplit) instantiatedSplit;

      ??? job.set("map.input.file", fileSplit.getPath().toString());

      ??? job.setLong("map.input.start", fileSplit.getStart());

      ??? job.setLong("map.input.length", fileSplit.getLength());

      ? }

      ? RecordReader rawIn =????????????????? // open input

      ??? job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);

      ? RecordReader in = isSkipping() ?

      ????? new SkippingRecordReader(rawIn, getCounters(), umbilical) :

      ????? new TrackedRecordReader(rawIn, getCounters());

      ? job.setBoolean("mapred.skip.on", isSkipping());

      ? //对于map task,生成一个MapRunnable,默认是MapRunner

      ? MapRunnable runner =

      ??? ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

      ? try {

      ??? //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行

      处理。

      ??? runner.run(in, collector, reporter);?????

      ??? collector.flush();

      ? } finally {

      ??? in.close();?????????????????????????????? // close input

      ??? collector.close();

      ? }

      ? done(umbilical);

      }

      MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:

      public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,

      ??????????????? Reporter reporter)

      ? throws IOException {

      ? try {

      ??? K1 key = input.createKey();

      ??? V1 value = input.createValue();

      ??? while (input.next(key, value)) {

      ????? mapper.map(key, value, output, reporter);

      ????? if(incrProcCount) {

      ??????? reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,

      ??????????? SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);

      ????? }

      ??? }

      ? } finally {

      ??? mapper.close();

      ? }

      }

      结果集全部收集到MapOutputBuffer中,其collect函数如下:

      ?

      public synchronized void collect(K key, V value)

      ??? throws IOException {

      ? reporter.progress();

      ? ……

      ? //从此处看,此buffer是一个ring的数据结构

      ? final int kvnext = (kvindex + 1) % kvoffsets.length;

      ? spillLock.lock();

      ? try {

      ??? boolean kvfull;

      ??? do {

      ????? //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了

      ????? kvfull = kvnext == kvstart;

      ????? //在ring中计算是否需要将buffer写入硬盘的阈值

      ????? final boolean kvsoftlimit = ((kvnext > kvend)

      ????????? ? kvnext - kvend > softRecordLimit

      ????????? : kvend - kvnext <= kvoffsets.length - softRecordLimit);

      ????? //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。

      ????? //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排

      序,合并,写入硬盘

      ????? if (kvstart == kvend && kvsoftlimit) {

      ??????? startSpill();

      ????? }

      ????? //如果buffer满了,则只能等待写入完毕

      ????? if (kvfull) {

      ????????? while (kvstart != kvend) {

      ??????????? reporter.progress();

      ??????????? spillDone.await();

      ????????? }

      ????? }

      ??? } while (kvfull);

      ? } finally {

      ??? spillLock.unlock();

      ? }

      ? try {

      ??? //如果buffer不满,则将key, value写入buffer

      ??? int keystart = bufindex;

      ??? keySerializer.serialize(key);

      ??? final int valstart = bufindex;

      ??? valSerializer.serialize(value);

      ??? int valend = bb.markRecord();

      ??? //调用设定的partitioner,根据key, value取得partition id

      ??? final int partition = partitioner.getPartition(key, value, partitions);

      ??? mapOutputRecordCounter.increment(1);

      ??? mapOutputByteCounter.increment(valend >= keystart

      ??????? ? valend - keystart

      ??????? : (bufvoid - keystart) + valend);

      ??? //将parition id以及key, value在buffer中的偏移量写入索引数组

      ??? int ind = kvindex * ACCTSIZE;

      ??? kvoffsets[kvindex] = ind;

      ??? kvindices[ind + PARTITION] = partition;

      ??? kvindices[ind + KEYSTART] = keystart;

      ??? kvindices[ind + VALSTART] = valstart;

      ??? kvindex = kvnext;

      ? } catch (MapBufferTooSmallException e) {

      ??? LOG.info("Record too large for in-memory buffer: " + e.getMessage());

      ??? spillSingleRecord(key, value);

      ??? mapOutputRecordCounter.increment(1);

      ??? return;

      ? }

      }

      内存buffer的格式如下:

      (见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx?以及http://caibinbupt.javaeye.com/)

      Hadoop学习小结:Map-Reduce的过程解析

      kvoffsets是为了写入内存前排序使用的。

      从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:

      ?

      ?

      private void sortAndSpill() throws IOException {

      ? ……

      ? FSDataOutputStream out = null;

      ? FSDataOutputStream indexOut = null;

      ? IFileOutputStream indexChecksumOut = null;

      ? //创建硬盘上的spill文件

      ? Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),

      ????????????????????????????????? numSpills, size);

      ? out = rfs.create(filename);

      ? ……

      ? final int endPosition = (kvend > kvstart)

      ??? ? kvend

      ??? : kvoffsets.length + kvend;

      ? //按照partition的顺序对buffer中的数据进行排序

      ? sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

      ? int spindex = kvstart;

      ? InMemValBytes value = new InMemValBytes();

      ? //依次一个一个parition的写入文件

      ? for (int i = 0; i < partitions; ++i) {

      ??? IFile.Writer<K, V> writer = null;

      ??? long segmentStart = out.getPos();

      ??? writer = new Writer<K, V>(job, out, keyClass, valClass, codec);

      ??? //如果combiner为空,则直接写入文件

      ??? if (null == combinerClass) {

      ??????? ……

      ??????? writer.append(key, value);

      ??????? ++spindex;

      ???? }

      ???? else {

      ??????? ……

      ??????? //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件

      ??????? combineAndSpill(kvIter, combineInputCounter);

      ???? }

      ? }

      ? ……

      }

      当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:

      ?

      private void mergeParts() throws IOException {

      ??? ……

      ??? //对于每一个partition

      ??? for (int parts = 0; parts < partitions; parts++){

      ????? //create the segments to be merged

      ????? List<Segment<K, V>> segmentList =

      ??????? new ArrayList<Segment<K, V>>(numSpills);

      ????? TaskAttemptID mapId = getTaskID();

      ?????? //依次从各个spill文件中收集属于当前partition的段

      ????? for(int i = 0; i < numSpills; i++) {

      ??????? final IndexRecord indexRecord =

      ????????? getIndexInformation(mapId, i, parts);

      ??????? long segmentOffset = indexRecord.startOffset;

      ??????? long segmentLength = indexRecord.partLength;

      ??????? Segment<K, V> s =

      ????????? new Segment<K, V>(job, rfs, filename[i], segmentOffset,

      ??????????????????????????? segmentLength, codec, true);

      ??????? segmentList.add(i, s);

      ????? }

      ????? //将属于同一个partition的段merge到一起

      ????? RawKeyValueIterator kvIter =

      ??????? Merger.merge(job, rfs,

      ???????????????????? keyClass, valClass,

      ???????????????????? segmentList, job.getInt("io.sort.factor", 100),

      ???????????????????? new Path(getTaskID().toString()),

      ???????????????????? job.getOutputKeyComparator(), reporter);

      ????? //写入合并后的段到文件

      ????? long segmentStart = finalOut.getPos();

      ????? Writer<K, V> writer =

      ????????? new Writer<K, V>(job, finalOut, keyClass, valClass, codec);

      ????? if (null == combinerClass || numSpills < minSpillsForCombine) {

      ??????? Merger.writeFile(kvIter, writer, reporter, job);

      ????? } else {

      ??????? combineCollector.setWriter(writer);

      ??????? combineAndSpill(kvIter, combineInputCounter);

      ????? }

      ????? ……

      ??? }

      }

      6.2、ReduceTask

      ReduceTask的run函数如下:

      public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

      ? throws IOException {

      ? job.setBoolean("mapred.skip.on", isSkipping());

      ? //对于reduce,则包含三个步骤:拷贝,排序,Reduce

      ? if (isMapOrReduce()) {

      ??? copyPhase = getProgress().addPhase("copy");

      ??? sortPhase? = getProgress().addPhase("sort");

      ??? reducePhase = getProgress().addPhase("reduce");

      ? }

      ? startCommunicationThread(umbilical);

      ? final Reporter reporter = getReporter(umbilical);

      ? initialize(job, reporter);

      ? //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程

      MapOutputCopier,其中copyOutput进行拷贝。

      ? boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));

      ? if (!isLocal) {

      ??? reduceCopier = new ReduceCopier(umbilical, job);

      ??? if (!reduceCopier.fetchOutputs()) {

      ??????? ……

      ??? }

      ? }

      ? copyPhase.complete();

      ? //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于

      访问key-value

      ? setPhase(TaskStatus.Phase.SORT);

      ? statusUpdate(umbilical);

      ? final FileSystem rfs = FileSystem.getLocal(job).getRaw();

      ? RawKeyValueIterator rIter = isLocal

      ??? ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

      ??????? job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

      ??????? !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),

      ??????? new Path(getTaskID().toString()), job.getOutputKeyComparator(),

      ??????? reporter)

      ??? : reduceCopier.createKVIterator(job, rfs, reporter);

      ? mapOutputFilesOnDisk.clear();

      ? sortPhase.complete();

      ? //reduce阶段

      ? setPhase(TaskStatus.Phase.REDUCE);

      ? ……

      ? Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);

      ? Class keyClass = job.getMapOutputKeyClass();

      ? Class valClass = job.getMapOutputValueClass();

      ? ReduceValuesIterator values = isSkipping() ?

      ???? new SkippingReduceValuesIterator(rIter,

      ????????? job.getOutputValueGroupingComparator(), keyClass, valClass,

      ????????? job, reporter, umbilical) :

      ????? new ReduceValuesIterator(rIter,

      ????? job.getOutputValueGroupingComparator(), keyClass, valClass,

      ????? job, reporter);

      ? //逐个读出key-value list,然后调用Reducer的reduce函数

      ? while (values.more()) {

      ??? reduceInputKeyCounter.increment(1);

      ??? reducer.reduce(values.getKey(), values, collector, reporter);

      ??? values.nextKey();

      ??? values.informReduceProgress();

      ? }

      ? reducer.close();

      ? out.close(reporter);

      ? done(umbilical);

      }

      ?

      七、总结

      Map-Reduce的过程总结如下图:

      Hadoop学习小结:Map-Reduce的过程解析

      from?http://www.cnblogs.com/end/archive/2011/04/26/2029496.html

热点排行