用 Hadoop 进行分布式并行编程, 第 1 部分
?
?
图一说明了用 MapReduce 来处理大数据集的过程, 这个 MapReduce 的计算过程简而言之,就是将大数据集分解为成百上千的小数据集,每个(或若干个)数据集分别由集群中的一个结点(一般就是一台普通的计算机)进行处理并生成中间结果,然后这些中间结果又由大量的结点进行合并, 形成最终结果。
计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 <key, value> 对转换成另一个或一批 <key, value> 对输出。
把原始大数据集切割成小数据集时,通常让小数据集小于或等于 HDFS 中一个 Block 的大小(缺省是 64M),这样能够保证一个小数据集位于一台计算机上,便于本地计算。有 M 个小数据集待处理,就启动 M 个 Map 任务,注意这 M 个 Map 任务分布于 N 台计算机上并行运行,Reduce 任务的数量 R 则可由用户指定。
把 Map 任务输出的中间结果按 key 的范围划分成 R 份( R 是预先定义的 Reduce 任务的个数),划分时通常使用 hash 函数如: hash(key) mod R,这样可以保证某一段范围内的 key,一定是由一个 Reduce 任务来处理,可以简化 Reduce 的过程。
在 partition 之前,还可以对中间结果先做 combine,即将中间结果中有相同 key的 <key, value> 对合并成一对。combine 的过程与 Reduce 的过程类似,很多情况下就可以直接使用 Reduce 函数,但 combine 是作为 Map 任务的一部分,在执行完 Map 函数后紧接着执行的。Combine 能够减少中间结果中 <key, value> 对的数目,从而减少网络流量。
Map 任务的中间结果在做完 Combine 和 Partition 之后,以文件形式存于本地磁盘。中间结果文件的位置会通知主控 JobTracker, JobTracker 再通知 Reduce 任务到哪一个 DataNode 上去取中间结果。注意所有的 Map 任务产生中间结果均按其 Key 用同一个 Hash 函数划分成了 R 份,R 个 Reduce 任务各自负责一段 Key 区间。每个 Reduce 需要向许多个 Map 任务结点取得落在其负责的 Key 区间内的中间结果,然后执行 Reduce 函数,形成一个最终的结果文件。
有 R 个 Reduce 任务,就会有 R 个最终结果,很多情况下这 R 个最终结果并不需要合并成一个最终结果。因为这 R 个最终结果又可以做为另一个计算任务的输入,开始另一个并行计算任务。
$ echo "hello hadoop goodbye hadoop" >file2.txt
$ cd ..
$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out
#执行完毕,下面查看执行结果:
$ cd test-out
$ cat part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
这种模式也是在一台单机上运行,但用不同的 Java 进程模仿分布式运行中的各类结点 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),请注意分布式运行中的这几个结点的区别:
从分布式存储的角度来说,集群中的结点由一个 NameNode 和若干个 DataNode 组成, 另有一个 Secondary NameNode 作为 NameNode 的备份。 从分布式应用的角度来说,集群中的结点由一个 JobTracker 和若干个 TaskTracker 组成,JobTracker 负责任务的调度,TaskTracker 负责并行执行任务。TaskTracker 必须运行在 DataNode 上,这样便于数据的本地计算。JobTracker 和 NameNode 则无须在同一台机器上。
(1) 按代码清单2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的参数,你可以通过读此文件了解 Hadoop 中有哪些参数可供配置,但不要修改此文件。可通过修改 conf/hadoop-site.xml 改变缺省参数值,此文件中设置的参数值会覆盖 conf/hadoop-default.xml 的同名参数。
?
<configuration>
<property>
<name>fs.default.name</name>
<value>localhost:9000</value>
</property>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
参数 mapred.job.tracker 指定 JobTracker 的 IP 地址和端口号。缺省值是 local, 表示在本地同一 Java 进程内执行 JobTracker 和 TaskTracker, 用于单机非分布式模式。此处我们指定使用运行于本机 localhost 上的 JobTracker ( 用一个单独的 Java 进程做 JobTracker )。
参数 dfs.replication 指定 HDFS 中每个 Block 被复制的次数,起数据冗余备份的作用。 在典型的生产系统中,这个数常常设置为3。
(2)配置 SSH,如代码清单3所示:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
(3)格式化一个新的分布式文件系统, 如代码清单4所示:
(1) 执行 $ bin/start-all.sh 启动 Hadoop 进程后,会启动5个 java 进程, 同时会在 /tmp 目录下创建五个 pid 文件记录这些进程 ID 号。通过这五个文件,可以得知 namenode, datanode, secondary namenode, jobtracker, tasktracker 分别对应于哪一个 Java 进程。当你觉得 Hadoop 工作不正常时,可以首先查看这5个 java 进程是否在正常运行。
(2) 使用 web 接口。访问 http://localhost:50030 可以查看 JobTracker 的运行状态。访问 http://localhost:50060 可以查看 TaskTracker 的运行状态。访问 http://localhost:50070 可以查看 NameNode 以及整个分布式文件系统的状态,浏览分布式文件系统中的文件以及 log 等。
(3) 查看 ${HADOOP_HOME}/logs 目录下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一个对应的 log 文件,每一次运行的计算任务也有对应用 log 文件。分析这些 log 文件有助于找到故障原因。
回页首
现在,你已经了解了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且有了一个可以运行的 Hadoop 环境,运行了一个基于 Hadoop 的并行程序。在下一篇文章中,你将了解到如何针对一个具体的计算任务,基于 Hadoop 编写自己的分布式并行程序并将其部署运行等内容。
声明:本文仅代表作者个人之观点,不代表 IBM 公司之观点。
学习
访问Hadoop 官方网站,了解 Hadoop 及其子项目 HBase 的信息。Hadoop wiki上, 有许多 Hadoop 的用户文档,开发文档,示例程序等。阅读 Google Mapreduce 论文:MapReduce: Simplified Data Processing on Large Clusters, 深入了解 Mapreduce 计算模型。学习 Hadoop 分布式文件系统 HDFS:The Hadoop Distributed File System:Architecture and Design学习 Google 文件系统 GFS:The Google File System, Hadoop HDFS 实现了与 GFS 类似的功能。讨论
加入Hadoop 开发者邮件列表,了解 Hadoop 项目开发的最新进展。曹羽中,在北京航空航天大学获得计算机软件与理论专业的硕士学位,具有数年的 unix 环境下的 C 语言,Java,数据库以及电信计费软件的开发经验,他的技术兴趣还包括 OSGi 和搜索技术。他目前在IBM中国系统与科技实验室从事系统管理软件的开发工作,可以通过caoyuz@cn.ibm.com与他联系。
?