hadoop 中基于 Bloom filter的联结
????? 1.第一次写东西啊
?
???????? 第一次在ITeye上面写blog,心情无比激动哈,虽然技术不咋滴,但是也为了记录自己这段时间学习hadoop遇到并解决的各种各样的问题,和大家分享一下一下自己的一些小经验,也留作以后的一个笔记吧!
?
????? 2.windows下hadoop下载和安装
????? 网上有很多关于hadoop在不同平台下面安装和运行开发的文章,我在这里就不赘述了,我的hadoop版本是1.0.3,开发环境是windows XP(就因为是XP给我遇到了各种各样悲催的问题啊),eclipse3.6.1,下载hadoop在eclipse上面的插件(在下载的hadoop的源码的\src\contrib\eclipse-plugin目录下面是eclipse插件的源码,自行编译打包生成jar以后,放到eclipse的pulgins目录下就可以了,附上我正常运行的eclipse的hadoop插件,hadoop-eclipse-plugin-1.0.3.jar)
?
?????? 3.hadoop mapReduce datajoin
?
?
Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。Bloom filter的主要优势在于它的大小(比特位个数)为常数且在初始化时被设置。增加更多的元素到一个Bloom filter不会增加它的大小。它仅增加误报的概率。但是这个概率非常小。Bloom filter原理介绍
hadoop在join操作的时候,主要有四种join方式,
?
3.1,reduce侧连接
?
mr程序处理两个文件,两个文件名称分别为aaa.txt和bbb.txt,map程序会DataJoinMapperBase这个类,为数据源设定对应的tag(tag一般为文件的文件名,对于数据结构相同,但是拆分后的文件,比如aaa.txt拆分成aaa_1.txt和aaa_2.txt,那么tag就为aaa),设定数据源联结关联的key,在reduce程序会对相同联结键的所有记录一起处理,reduce()通过解包得到原始记录,以及根据标签所得到的记录的数据源。在reduce侧执行联结的操作,其中datajoin包实现了主要的联结操作
?
3.2,基于DistributedCache的复制联结
?
当联结多个数据源的时候,可以选取其中较小的一个数据源放到内存中,我们可以通过较小的数据源复制到所有mapper,在mapper侧实现联结,以实现效率上的极大提高。第一步,当配置作业时,你可以调用静态方法DistributedCache。addCachefile()来设定要传播到所有节点上的文件,第二部,在每个单独TaskTracker上的mapper会调用静态方法DistributedCache.getLocalCacheFiles()获取数组本地副本所在的本地文件的路径(我曾多次测试过,在你eclipse插件连接上你的hadoop环境的时候,会默认给你设置一些配置参数,这些配置参数都是相对路径,如果在windows下面开发,会默认创建在你项目所在盘符的根目录,所以你获取到的url不能直接读取,很悲剧,只有手动加上当前项目保存的盘符路径)
3.3,半联结:map侧过滤后在reduce侧联结
?
如果较小的数据源仍不能放到内存中,那么可以将较小的数据源的键全部取出来新建一个保存键列表的文件,在map阶段,使用DistributedCache将key值文件复制到各个TaskTracker上,去除不需要的或者不对应的数据源中的key值列表,剩下的操作和reduce侧连接相同了。
?
3.4,reduce侧联结+Bloom filter
?
在某些情况下,侧联结抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。
后面会有详细描述
?
关于hadoop mapReduce join的更详细的介绍参考这里:hadoop mapReduce join
?
4.关于reduce侧联结和Bloom filter的开发详细介绍
?
4.1 实现 Bloom filter
?
Bloom filter的实现在概念上非常直观,它的内在表现为一个m比特位的数组,我们有k个独立的散列函数,这里每个散列函数的输入为一个对象,而输出为介于0和m-1之间的一个整数,我们使用这个输出的整数作为位数组的索引。
当有一个对象到来时,我们如要检查它是否已经被加入到Bloom filter中,则使用在添加对象时相同的k个散列函数来生成一个位数据的索引。假设所有的比特数组值在所有的情况下都为0,即不存在,那么生成的索引对应的值如果为1,代表存在,返回true,为0,代表不存在,返回false。这里没有漏报,只有当两个对象生成的比特值相同时可能存在误报,但是在reduce侧会舍弃这些误报的数据。
?
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.security.MessageDigest;import java.util.BitSet;import java.util.Random;import org.apache.hadoop.io.Writable;public class BloomFilter2<E> implements Writable{ private BitSet bf; private int bitArraySize = 100000000; private int numHashFunc = 6; public BloomFilter2(){ bf = new BitSet(bitArraySize); } public void add(E obj){ int[] indexes = getHashIndexes(obj); for(int index : indexes){ bf.set(index); } } public boolean contains(E obj){ int[] indexes = getHashIndexes(obj); for(int index : indexes){ if(!bf.get(index)){ return Boolean.FALSE; } } return Boolean.TRUE; } protected int[] getHashIndexes(E obj){ int[] indexes = new int[numHashFunc]; long seed = 0; byte[] digest; try { MessageDigest md = MessageDigest.getInstance("MD5"); md.update(obj.toString().getBytes()); digest = md.digest(); for(int i = 0;i < numHashFunc;i++){ seed = seed ^ (((long)digest[i] & 0xFF)) << (8*i); } } catch (Exception e) { e.printStackTrace(); } Random rdm = new Random(seed); for(int i=0;i<numHashFunc;i++){ indexes[i] = rdm.nextInt(bitArraySize); } return indexes; } public void union(BloomFilter2<E> other){ bf.or(other.bf); } @Override public void readFields(DataInput in) throws IOException { int byteArraySize = (int) (bitArraySize / 8); byte[] byteArray = new byte[byteArraySize]; in.readFully(byteArray); for(int i=0;i<byteArraySize;i++){ byte nextByte = byteArray[i]; for(int j=0;j<8;j++){ if(((int)nextByte & (1<<j)) != 0){ bf.set(8*i+j); } } } } @Override public void write(DataOutput out) throws IOException { int byteArraySize = (int) (bitArraySize / 8); byte[] byteArray = new byte[byteArraySize]; for(int i = 0;i<byteArraySize;i++){ byte nextElement = 0; for(int j = 0;j<8;j++){ if(bf.get(8*i+j)){ nextElement |= 1 << j; } } byteArray[i] = nextElement; } out.write(byteArray); }}?
这是摘自hadoop in action书上的Bloom filter的实现,在两个集合的合并的时候,会采用union()方法做巧妙的实现,每个mapper根据自己的数据分片构造一个Bloom filter,我们再把这些这些 Bloom filter发送到一个单一的reducer上,将它们归并且记录最终的输出,
由于Bloom filter会随着mapper的输出被打乱,Bloom filter类必须实现wirtable接口,它包括wirter()和readFileds()方法,这些方法实现了在内部的BitSet表示和一个字节数组的转换,从而让这些数据可以被序列化到DataInput/DataOutput.
?
4.2 基与Bloom filter 的mapreduce程序
?
看过上面的代码以后,大家就应该很清楚mapreduce程序的实现了吧,在map程序中,实例化一个BloomFilter类变量,每次map方法中,都将key值添加到BloomFliter中
bloomfilter.add(key.toString());
重写父类的close方法(只有在hadoop0.2版本以前有),如果是更高端的版本需要用cleanup(context)方法替代,在这个方法中,将bloomfilter输出。
在reduce代码中,也需要重写父类的configuer(0.2版本以后是setup())和close()(cleanup())方法,以及reduce方法
定义一个全局的jobconf(0.20以后context获取)变量,在configuer方法中初始化job对象,在reduce方法中最数据联结,bloomFliter.union(value);在close()方法中将联结后的记录输出。
可能表达的不好,因为这块我没有自定义Bloom filter只是对书上的例子进行了编译,hadoop在0.2.0版本以后有自己实现的Bloom filter类,主要使用的是hadoop自己实现的Bloom filter类。
?
代码不贴出来了,在hadoop in action这本书上都有,和下面采用hadoop自身实现Bloom filter的maperduce程序有异曲同工之妙。
?
4.3:基于hadoop自身实现的Bloom filter的mapreduce实现
?
Bloom filter源码:\src\core\org\apache\hadoop\util 目录下
?
Bloom filter api:http://hadoop.apache.org/common/docs/r0.20.0/api/org/apache/hadoop/util/bloom/BloomFilter.html#BloomFilter%28int,%20int,%20int%29
?
定义一个hadoop的Bloom filter,它有两个构造函数,一个无参,一个有参,无参构造函数是为指定reduce函数的输出的时候使用的,我们在mapreduce中需要采用有参的构造函数。
public BloomFilter(int?vectorSize, int?nbHash, int?hashType)看api就很容易理解三个参数的意思,第一个参数是vector的大小,这个值尽量给的大,可以避免hash对象的时候出现索引重复,第二个参数是散列函数的个数,第三个是hash的类型,虽然是int型,但是只有默认两个值,看源嘛知道在hadoop的hash类里指定两个常量就是两个hash类型的实现,默认可以通过Hash类指定其中的一个变量
public static final int INVALID_HASH = -1; /** Constant to denote {@link JenkinsHash}. */ public static final int JENKINS_HASH = 0; /** Constant to denote {@link MurmurHash}. */ public static final int MURMUR_HASH = 1; public static int parseHashType(String name) { if ("jenkins".equalsIgnoreCase(name)) { return JENKINS_HASH; } else if ("murmur".equalsIgnoreCase(name)) { return MURMUR_HASH; } else { return INVALID_HASH; }如果不这样指定,就会在add()方法中抛出一个空指针的异常,看源码知道是因为hash类没有初始化,是因为bloom filter的构造函数的参数错误咯!if(filter == null || !(filter instanceof BloomFilter) || filter.vectorSize != this.vectorSize || filter.nbHash != this.nbHash) { throw new IllegalArgumentException("filters cannot be and-ed"); }?一看到这个异常,你就应该知道是因为你的map和reduce类中初始化的Bloom filter不一样啊,赶紧检查一下就ok了,贴一下调用hadoop自身实现的Bloom filter的源码吧:
import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.KeyValueTextInputFormat;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import org.apache.hadoop.util.bloom.BloomFilter;import org.apache.hadoop.util.bloom.Key;import org.apache.hadoop.util.hash.Hash;public class HadoopSevenMapperDemo extends Configured implements Tool {public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, BloomFilter>{BloomFilter bloomFilter = new BloomFilter(1000000,6,Hash.JENKINS_HASH);OutputCollector<Text, BloomFilter> ct = null;@Overridepublic void map(Text key, Text arg1,OutputCollector<Text, BloomFilter> output, Reporter arg3)throws IOException {if(ct == null){ct = output;}System.out.println(key.toString()+"``````````````");bloomFilter.add(new Key(key.toString().getBytes()));}@Overridepublic void close(){try {ct.collect(new Text("testKey"), bloomFilter);} catch (Exception e) {e.printStackTrace();}}}public static class ReduceClass extends MapReduceBase implements Reducer<Text, BloomFilter, Text, Text>{JobConf job = null;BloomFilter bf = new BloomFilter(1000000,6,Hash.JENKINS_HASH);@Overridepublic void configure(JobConf job) {this.job = job;}@Overridepublic void reduce(Text key, Iterator<BloomFilter> values,OutputCollector<Text, Text> output, Reporter arg3)throws IOException {while(values.hasNext()){bf.or(values.next());}}@Overridepublic void close() throws IOException {Path file = new Path(job.get("mapred.output.dir")+"/bloomfilter");FSDataOutputStream out = file.getFileSystem(job).create(file);bf.write(out);out.close();}}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = getConf();JobConf job = new JobConf(conf,HadoopSevenMapperDemo.class);Path in = new Path("/user/Administrator/input7");Path out = new Path("/user/Administrator/output7");FileInputFormat.setInputPaths(job, in);FileOutputFormat.setOutputPath(job, out);job.setJobName("HadoopSevenMapperDemo");job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setNumReduceTasks(1);job.setInputFormat(KeyValueTextInputFormat.class);job.setOutputFormat(NullOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BloomFilter.class);job.set("key.value.separator.in.input.line", ",");JobClient.runJob(job);return 0;}public static void main(String[] args) {int res;try {res = ToolRunner.run(new Configuration(),new HadoopSevenMapperDemo(), args);System.exit(res);} catch (Exception e) {e.printStackTrace();}}}??
大家看到源码中引用的包都是mapred下面的类,那是因为这些类都是0.2.0以前的版本中有的类,1.0.3版本中仍然兼容,如果想用新版本的类,注意引用mapreduce下面的类就好,因为很多类名字没有改变,只是在不同的包下而已
?
?
?
5:参考资料
?
(1)书籍《Hadoop In Action》第五章
(2)BloomFilter介绍:http://blog.csdn.net/jiaomeng/article/details/1495500
(3)hadoop mapreduce join :http://xubindehao.iteye.com/blog/1405860
?
?