首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

MapReduce : 新版API 自定义InputFormat 把整个资料作为一条记录处理

2012-09-20 
MapReduce : 新版API 自定义InputFormat 把整个文件作为一条记录处理自定义InputFormat 新版API 把真个文

MapReduce : 新版API 自定义InputFormat 把整个文件作为一条记录处理

自定义InputFormat 新版API 把真个文件当成一条输入主要参考 源代码LineRecordReader里面的内容  有些细节还没有理解WholeFileInputFormatimport java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException,InterruptedException {// TODO Auto-generated method stubreturn new WholeFileRecordReader();}@Overrideprotected boolean isSplitable(JobContext context, Path filename) {// TODO Auto-generated method stubreturn false;}}WholeFileRecordReaderimport java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {private FileSplit fileSplit;private FSDataInputStream fis;private Text key = null;private BytesWritable value = null;private boolean processed = false;@Overridepublic void close() throws IOException {// TODO Auto-generated method stub//fis.close();}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn this.key;}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {// TODO Auto-generated method stubreturn this.value;}@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;Configuration job = tacontext.getConfiguration();Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(job);fis = fs.open(file);}@Overridepublic boolean nextKeyValue() {if(key == null){key = new Text();}if(value == null){value = new BytesWritable();}if(!processed){byte[] content = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();System.out.println(file.getName());key.set(file.getName());try {IOUtils.readFully(fis, content, 0, content.length);//value.set(content, 0, content.length);value.set(new BytesWritable(content));} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally{IOUtils.closeStream(fis);}  processed = true;return true;}return false;}@Overridepublic float getProgress() throws IOException, InterruptedException {// TODO Auto-generated method stubreturn processed? fileSplit.getLength():0;}}验证public static class mapper extends Mapper<Text, BytesWritable, Text, Text>{@Overrideprotected void map(Text key, BytesWritable value, Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubcontext.write(key, new Text(value.getBytes()));}}note:value是BytesWritable类型,显示为十六进制数, new Text(value.getBytes()) 变成字符串形式
?

热点排行