Hadoop 中 DistributedCache 的应用
前一段时间在写一个MapReduce程序时,需要用到一些第三方的Jar包,于是运行程序时需要把这些Jar包一一上传到所有Hadoop节点。
当集群中增加一个节点或换一套集群环境时,又需要重新布置一遍这些Jar包,这样不仅费时,而且还容易遗漏、出错。并且,当我不再需要这些Jar包时,又得一个一个节点去清理。非常耗时。
后来查阅相关资料,发现Hadoop提供一个DistributedCache,用它可以把HDFS中的文件加载到DistributedCache中,当我们需要这些文件时,DistributedCache自动把这些文件下载到集群中节点的本地存储上(mapred.local.dir)。这样就不需要一一布置第三方的Jar包,并且Hadoop集群增加节点也不需要再上传了。
DistributedCache的使用方法,参考http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/filecache/DistributedCache.html。
// Setting up the cache for the application 1. Copy the requisite files to the FileSystem: $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz 2. Setup the application's JobConf: JobConf job = new JobConf(); DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), job); DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); 3. Use the cached files in the Mapper or Reducer: public static class MapClass extends MapReduceBase implements Mapper<K, V, K, V> { private Path[] localArchives; private Path[] localFiles; public void configure(JobConf job) { // Get the cached archives/files localArchives = DistributedCache.getLocalCacheArchives(job); localFiles = DistributedCache.getLocalCacheFiles(job); } public void map(K key, V value, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Use data from the cached archives/files here // ... // ... output.collect(k, v); } }