用 Hadoop 进行分布式并行编程, 第 1 部分
图一说明了用 MapReduce 来处理大数据集的过程, 这个 MapReduce 的计算过程简而言之,就是将大数据集分解为成百上千的小数据集,每个(或若干个)数据集分别由集群中的一个结点(一般就是一台普通的计算机)进行处理并生成中间结果,然后这些中间结果又由大量的结点进行合并, 形成最终结果。
计算模型的核心是 Map 和 Reduce 两个函数,这两个函数由用户负责实现,功能是按一定的映射规则将输入的 <key, value> 对转换成另一个或一批 <key, value> 对输出。
表一 Map 和 Reduce 函数
?
注意事项:运行 bin/hadoop jar hadoop-0.16.0-examples.jar wordcount test-in test-out 时,务必注意第一个参数是 jar, 不是 -jar, 当你用 -jar 时,不会告诉你是参数错了,报告出来的错误信息是:Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/util/ProgramDriver, 笔者当时以为是 classpath 的设置问题,浪费了不少时间。通过分析 bin/hadoop 脚本可知,-jar 并不是 bin/hadoop 脚本定义的参数,此脚本会把 -jar 作为 Java 的参数,Java 的-jar 参数表示执行一个 Jar 文件(这个 Jar 文件必须是一个可执行的 Jar,即在 MANIFEST 中定义了主类), 此时外部定义的 classpath 是不起作用的,因而会抛出 java.lang.NoClassDefFoundError 异常。而 jar 是 bin/hadoop 脚本定义的参数,会调用 Hadoop 自己的一个工具类 RunJar,这个工具类也能够执行一个 Jar 文件,并且外部定义的 classpath 有效。
伪分布式运行模式
这种模式也是在一台单机上运行,但用不同的 Java 进程模仿分布式运行中的各类结点 ( NameNode, DataNode, JobTracker, TaskTracker, Secondary NameNode ),请注意分布式运行中的这几个结点的区别:
从分布式存储的角度来说,集群中的结点由一个 NameNode 和若干个 DataNode 组成, 另有一个 Secondary NameNode 作为 NameNode 的备份。从分布式应用的角度来说,集群中的结点由一个 JobTracker 和若干个 TaskTracker 组成,JobTracker 负责任务的调度,TaskTracker 负责并行执行任务。TaskTracker 必须运行在 DataNode 上,这样便于数据的本地计算。JobTracker 和 NameNode 则无须在同一台机器上。
(1) 按代码清单2修改 conf/hadoop-site.xml。注意 conf/hadoop-default.xml 中是 Hadoop 缺省的参数,你可以通过读此文件了解 Hadoop 中有哪些参数可供配置,但不要修改此文件。可通过修改 conf/hadoop-site.xml 改变缺省参数值,此文件中设置的参数值会覆盖 conf/hadoop-default.xml 的同名参数。
代码清单 2
<configuration> <property> <name>fs.default.name</name> <value>localhost:9000</value> </property> <property> <name>mapred.job.tracker</name> <value>localhost:9001</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property></configuration>
?
参数 fs.default.name 指定 NameNode 的 IP 地址和端口号。缺省值是 file:///, 表示使用本地文件系统, 用于单机非分布式模式。此处我们指定使用运行于本机 localhost 上的 NameNode。
参数 mapred.job.tracker 指定 JobTracker 的 IP 地址和端口号。缺省值是 local, 表示在本地同一 Java 进程内执行 JobTracker 和 TaskTracker, 用于单机非分布式模式。此处我们指定使用运行于本机 localhost 上的 JobTracker ( 用一个单独的 Java 进程做 JobTracker )。
参数 dfs.replication 指定 HDFS 中每个 Block 被复制的次数,起数据冗余备份的作用。在典型的生产系统中,这个数常常设置为3。
(2)配置 SSH,如代码清单3所示:
代码清单 3
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
?
配置完后,执行一下 ssh localhost, 确认你的机器可以用 SSH 连接,并且连接时不需要手工输入密码。
(3)格式化一个新的分布式文件系统, 如代码清单4所示:
代码清单 4
$ cd /cygdrive/c/hadoop-0.16.0$ bin/hadoop namenode –format
?
(4) 启动 hadoop 进程, 如代码清单5所示。控制台上的输出信息应该显示启动了 namenode, datanode, secondary namenode, jobtracker, tasktracker。启动完成之后,通过 ps –ef 应该可以看到启动了5个新的 java 进程。
代码清单 5
$ bin/start-all.sh $ ps –ef
?
(5) 运行 wordcount 应用, 如代码清单6所示:
代码清单 6
$ bin/hadoop dfs -put ./test-in input #将本地文件系统上的 ./test-in 目录拷到 HDFS 的根目录上,目录名改为 input#执行 bin/hadoop dfs –help 可以学习各种 HDFS 命令的使用。$ bin/hadoop jar hadoop-0.16.0-examples.jar wordcount input output#查看执行结果:#将文件从 HDFS 拷到本地文件系统中再查看:$ bin/hadoop dfs -get output output $ cat output/*#也可以直接查看$ bin/hadoop dfs -cat output/*$ bin/stop-all.sh #停止 hadoop 进程
?
故障诊断
(1) 执行 $ bin/start-all.sh 启动 Hadoop 进程后,会启动5个 java 进程, 同时会在 /tmp 目录下创建五个 pid 文件记录这些进程 ID 号。通过这五个文件,可以得知 namenode, datanode, secondary namenode, jobtracker, tasktracker 分别对应于哪一个 Java 进程。当你觉得 Hadoop 工作不正常时,可以首先查看这5个 java 进程是否在正常运行。
(2) 使用 web 接口。访问 http://localhost:50030 可以查看 JobTracker 的运行状态。访问 http://localhost:50060 可以查看 TaskTracker 的运行状态。访问 http://localhost:50070 可以查看 NameNode 以及整个分布式文件系统的状态,浏览分布式文件系统中的文件以及 log 等。
(3) 查看 ${HADOOP_HOME}/logs 目录下的 log 文件,namenode, datanode, secondary namenode, jobtracker, tasktracker 各有一个对应的 log 文件,每一次运行的计算任务也有对应用 log 文件。分析这些 log 文件有助于找到故障原因。
回页首
结束语
现在,你已经了解了 MapReduce 计算模型,分布式文件系统 HDFS,分布式并行计算等的基本原理, 并且有了一个可以运行的 Hadoop 环境,运行了一个基于 Hadoop 的并行程序。在下一篇文章中,你将了解到如何针对一个具体的计算任务,基于 Hadoop 编写自己的分布式并行程序并将其部署运行等内容。
声明:本文仅代表作者个人之观点,不代表 IBM 公司之观点。
?
参考资料
学习
访问 Hadoop 官方网站,了解 Hadoop 及其子项目 HBase 的信息。讨论
加入 Hadoop 开发者邮件列表,了解 Hadoop 项目开发的最新进展。关于作者
曹羽中,在北京航空航天大学获得计算机软件与理论专业的硕士学位,具有数年的 unix 环境下的 C 语言,Java,数据库以及电信计费软件的开发经验,他的技术兴趣还包括 OSGi 和搜索技术。他目前在IBM中国系统与科技实验室从事系统管理软件的开发工作,可以通过 caoyuz@cn.ibm.com与他联系。
?
原文地址:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop1/