hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
taskTracker 生成map reduce 任务详解
1.启动 TaskTracker ,执行main方法 new TaskTracker(conf) 启动taskTracker
2.taskTrack 构造方法初始化变量
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大map数 默认是2
mapred.tasktracker.map.tasks.maximum taskTracker 可launch 的最大reduce数 默认是2
mapred.disk.healthChecker.interval 磁盘健康度检查时间间隔 mills 默认是60*1000
构造rpc 链接jobTACKER
tasktracker.http.threads taskTracker 工作线程数,默认是40 ,copy数据的线程数
initialize(); 构造方法,该方法是一个独立于taskTracker的方法,可循环调用,在taskTracker 处于关闭状态时 仍可用
该方法主要用户构造一些 运行时目录 和心跳RPC 信息初始化分布式缓存开发map任务监听线程 初始化new TaskLauncher() 启动map 和reduce 任务抓取线程
synchronized void initialize() throws IOException, InterruptedException { this.fConf = new JobConf(originalConf); // 绑定地址.taskTracker 的地址 绑定到rpc中 为传送心跳信息做准备 String address = NetUtils.getServerAddress(fConf, "mapred.task.tracker.report.bindAddress", "mapred.task.tracker.report.port", "mapred.task.tracker.report.address"); InetSocketAddress socAddr = NetUtils.createSocketAddr(address); String bindAddress = socAddr.getHostName(); int tmpPort = socAddr.getPort(); //初始化jvm 管理类 this.jvmManager = new JvmManager(this); // RPC 初始化 int max = maxMapSlots > maxReduceSlots ? maxMapSlots : maxReduceSlots; //set the num handlers to max*2 since canCommit may wait for the duration//of a heartbeat RPC//此处taskTracker 汇报的 处理任务的slot 在实际的基础上*2,因为在心跳汇报的阶段传输这段时间会空出来一部分slot.在新的heartbeat 过来的时候 有2倍的slot处理能力 this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager); this.taskReportServer.start(); // 初始化分布式缓存,在写mr代码的时候 讲一个文件写入DistributedCache 的时候, DistributedCache 在这个位置进行初始化 this.distributedCacheManager = new TrackerDistributedCacheManager( this.fConf, taskController); // start the thread that will fetch map task completion events//在该位置启动 map和reduce任务的处理线程 this.mapEventsFetcher = new MapEventsFetcherThread(); mapEventsFetcher.setDaemon(true); mapEventsFetcher.setName( "Map-events fetcher for all reduce tasks " + "on " + taskTrackerName); mapEventsFetcher.start(); //这里 初始化了两个类TaskLauncher reduce 和map 这两个类是具体的 生成map和reduce 的任务类. mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots); reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); mapLauncher.start(); reduceLauncher.start();