Java 开发 2.0: 用 Hadoop MapReduce 进行大数据分析
Google 在 2001 年发布图像搜索功能时,只有 2.5 亿索引图像,不到 10 年,这个巨大的搜索功能已经可以检索超过 100 亿个图像了,每分钟有 35 小时的内容上传到 YouTube。据称,Twitter 每天平均处理 5500 万 tweet。今年早些时候,搜索功能每天记录 6 亿条查询记录。这 就是我们讨论大数据的意义所在。
?
正如您在 清单 4 中所看到的,opencsv
处理逗号分隔值非常容易。该解析器仅返回一组 String
,所以有可能获取位置信息(别忘了,在 Java 语言中数组和集合的访问是从零开始的)。
转换日期格式
当使用 MapReduce 进行处理时,map
函数的任务是选择一些要处理的值,以及一些键。这就是说,map
主要处理和返回两个元素:一个键和一个值。回到我之前的需求,我首先想知道每天会发生多少次地震。因此,当我在分析地震文件时,我将发布两个值:键是日期,值是一个计数器。reduce
函数将对计数器(只是一些值为 1 的整数)进行总计。因此,提供给我的是在目标地震文件中某一个日期出现的次数。
由于我只对 24 小时时段内的信息感兴趣,我得剔除每个文件中的日期的时间部分。在 清单 5 中,我编写了一个快速测试,验证如何将一个传入文件中的特定日期信息转换成一个更一般的 24 小时日期:
清单 5. 清单 5. 日期格式转换
@Testpublic void testParsingDate() throws Exception { String datest = "Monday, December 13, 2010 14:10:32 UTC"; SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z"); Date dt = formatter.parse(datest); formatter.applyPattern("dd-MM-yyyy"); String dtstr = formatter.format(dt); assertEquals("should be 13-12-2010", "13-12-2010", dtstr);}
?
在 清单 5 中,我使用了 SimpleDateFormat
Java 对象,将 CSV 文件中格式为 Monday, December 13, 2010 14:10:32 UTC 的日期 String
转换成了更一般的 13-12-2010。
Hadoop 的 map 和 reduce
现在我已经找到了处理 CSV 文件以及其日期格式的解决方法。我要开始在 Hadoop 中实施我的 map
和 reduce
函数了。这个过程需要理解 Java 泛型,因为 Hadoop 选择使用显式类型,为了安全起见。
当我使用 Hadoop 定义一个映射实现时,我只扩展 Hadoop 的 Mapper
类。然后我可以使用泛型来为传出键和值指定显式类。类型子句也指定了传入键和值,这对于读取文件分别是字节数和文本行数。
EarthQuakesPerDateMapper
类扩展了 Hadoop 的 Mapper
对象。它显式地将其输出键指定为一个 Text
对象,将其值指定为一个 IntWritable
,这是一个 Hadoop 特定类,实质上是一个整数。还要注意,class 子句的前两个类型是 LongWritable
和 Text
,分别是字节数和文本行数。
由于类定义中的类型子句,我将传入 map
方法的参数类型设置为在 context.write
子句内带有该方法的输出。如果我想指定其他内容,将会出现一个编译器问题,或 Hadoop 将输出一个错误消息,描述类型不匹配的消息。
清单 6. 清单 6. 一个映射实现
public class EarthQuakesPerDateMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() > 0) { try { CSVParser parser = new CSVParser(); String[] lines = parser.parseLine(value.toString()); SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z"); Date dt = formatter.parse(lines[3]); formatter.applyPattern("dd-MM-yyyy"); String dtstr = formatter.format(dt); context.write(new Text(dtstr), new IntWritable(1)); } catch (ParseException e) {} } }}
?
清单 6 中的 map
实现比较简单:本质上是,Hadoop 为在输入文件中找到的每一行文本调用这个类。为了避免除了 CSV 头部,首先检查是否字节数(key
对象)为零。然后执行清单 4 和 5 中的步骤:捕获传入日期,进行转换,然后设置为传出键。我也提供了一个数:1。就是说,我为每个日期编写一个计数器,当 reduce
实现被调用时,获取一个键和一系列值。在本例中,键是日期及其值,如 清单 7 所示:
清单 7. 清单 7. 一个 map 输出和 reduce 输入的逻辑视图
"13-12-2010":[1,1,1,1,1,1,1,1]"14-12-2010":[1,1,1,1,1,1]"15-12-2010":[1,1,1,1,1,1,1,1,1]
?
注意,context.write(new Text(dtstr), newIntWritable(1))
(在 清单 6 中)构建了如 清单 7 所示的逻辑集合。正如您所了解的,context
是一个保存各种信息的 Hadoop 数据结构。context
被传递到 reduce
实现,reduce
获取这些值为 1 的值然后总和起来。因此,一个 reduce
实现逻辑上创建如 清单 8 所示的数据结构:
清单 8. 清单 8. 一个 reduce 输出视图
"13-12-2010":8"14-12-2010":6"15-12-2010":9
?
我的 reduce
实现如 清单 9 所示。与 Hadoop 的 Mapper
一样,Reducer
被参数化了:前两个参数是传入的键类型(Text
)和值类型(IntWritable
),后两个参数是输出类型:键和值,这在本例中是相同的。
清单 9. 清单 9. reduce 实现
public class EarthQuakesPerDateReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count++; } context.write(key, new IntWritable(count)); }}
?
我的 reduce
实现非常简单。正如我在 清单 7 中所指出的,传入的是实际上是一个值的集合,在本例中是 1 的集合,我所做的就是将它们加起来,然后写出一个新键值对表示日期和次数。我的 reduce
代码可以挑出您在 清单 8 中所见到的这几行。逻辑流程看起来像这样:
"13-12-2010":[1,1,1,1,1,1,1,1] -> "13-12-2010":8
?
当然,这个清单的抽象形式是 map -> reduce
。
定义一个 Hadoop Job
现在我已经对我的 map
和 reduce
实现进行了编码,接下来所要做的是将所有这一切链接到一个 Hadoop Job
。定义一个 Job
比较简单:您需要提供输入和输出、map
和 reduce
实现(如 清单 6 和 清单 9 所示)以及输出类型。在本例中我的输出类型和 reduce
实现所用的是同一个类型。
清单 10. 清单 10. 一个将 map 和 redece 绑在一起的 Job
public class EarthQuakesPerDayJob { public static void main(String[] args) throws Throwable { Job job = new Job(); job.setJarByClass(EarthQuakesPerDayJob.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(EarthQuakesPerDateMapper.class); job.setReducerClass(EarthQuakesPerDateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
?
在 清单 10 中,我使用一个 main
方法将所有这一切绑在一起,该方法有两个参数:地震 CSV 文件的目录,以及生成报告的输出目录(Hadoop 更喜欢创建该目录)。
为了执行这个小框架,我需要将这些类打包。我还需要告知 Hadoop 在哪里可以找到 opencsv
二进制文件。然后可以通过命令行执行 Hadoop ,如 清单 11 所示:
清单 11. 清单 11. 执行 Hadoop
$> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar$> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob ~/temp/mreduce/in/ ~/temp/mreduce/out
?
运行这些代码,Hadoop 开始运行时您将可以看到一堆文本在屏幕上一闪而过。我所用的 CSV 文件相比专门用于处理这种情况的 Hadoop,那真是小巫见大巫!hadoop 应该可以在几秒钟内完成,具体取决于您的处理功能。
完成这些后,您可以使用任何编辑器查看输出文件内容。还可以选择直接使用 hadoop
命令。正如 清单 12 所示:
清单 12. 清单 12. 读取 Hadoop 输出
$> hadoop dfs -cat part-r-00000 05-12-2010 4306-12-2010 14307-12-2010 11208-12-2010 13609-12-2010 17810-12-2010 11411-12-2010 11412-12-2010 79
?
如果您像我一样,在 清单 12 中首先会注意到的就是每天地震数 — 12 月 9 日就有 178 次地震。希望您也会注意到
编写另一个 Mapper
接下来,我想找到地震发生在哪里,以及如何快速计算出在我的研究范围内记录地震次数最多的是哪个区域。当然,您已经猜到了,Hadoop 可以轻松地做到。在这个案例中,键不再是日期而是区域。因此,我编写了一个新的 Mapper
类。
清单 13. 清单 13. 一个新的 map 实现
public class EarthQuakeLocationMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get() > 0) { String[] lines = new CSVParser().parseLine(value.toString()); context.write(new Text(lines[9]), new IntWritable(1)); } }}
?
和之前获取日期然后进行转换相比,在 清单 13 中我所作的是获取位置,这是 CSV 阵列中的最后一个条目。
相比一个庞大的位置和数字列表,我将结果限制在那些 7 天内出现 10 次的区域。
清单 14. 清单 14. 哪里的地震较多?
public class EarthQuakeLocationReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count++; } if (count >= 10) { context.write(key, new IntWritable(count)); } } }
?
清单 14 中的代码和 清单 9 中的代码非常类似;然而,在本例中,我限制了输出大于或等于 10。接下来,我将 map
和 reduce
,以及其他 Job
实现绑在一起,进行打包,然后和平常一样执行 Hadoop 获取我的新答案。
使用 hadoop dfs
目录显示我所请求的新值:
清单 15. 清单 15. 地震区域分布
$> hadoop dfs -cat part-r-00000 Andreanof Islands, Aleutian Islands, Alaska 24Arkansas 40Baja California, Mexico 101Central Alaska 74Central California 68Greater Los Angeles area, California 16Island of Hawaii, Hawaii 16Kenai Peninsula, Alaska 11Nevada 15Northern California 114San Francisco Bay area, California 21Southern Alaska 97Southern California 115Utah 19western Montana 11
?
从 清单 15 还可以得到什么?首先,北美洲西海岸,从墨西哥到阿拉斯加是地震高发区。其次,阿肯色州明显位于断带层上,这是我没有意识到的。最后,如果您居住在北部或者是南加州(很多软件开发人员都居住于此),您周围的地方每隔 13 分钟会震动一次。
结束语
使用 Hadoop 分析数据轻松且高效,对于它对数据分析所提供的支持,我只是了解皮毛而已。Hadoop 的设计旨在以一种分布式方式运行,处理运行 map
和 reduce
的各个节点之间的协调性。作为示例,本文中我只在一个 JVM 上运行 Hadoop,该 JVM 仅有一个无足轻重的文件。
Hadoop 本身是一个功能强大的工具,围绕它还有一个完整的、不断扩展的生态系统,可以提供子项目至基于云计算的 Hadoop 服务。Hadoop 生态系统演示了项目背后丰富的社区活动。来自社区的许多工具证实了大数据分析作为一个全球业务活动的可行性。有了 Hadoop,分布式数据挖掘和分析对所有软件创新者和企业家都是可用的,包括但不限于 Google 和 Yahoo! 这类大企业。