首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

应用MapReduce往Hbase插入数据

2012-12-27 
使用MapReduce往Hbase插入数据import java.io.IOExceptionimport org.apache.hadoop.conf.Configuration

使用MapReduce往Hbase插入数据

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import com.hbase.log.RecordParser;public class HbaseInsertData {public static class HbaseMapper extends Mapper<LongWritable, Text, Text, Text>{RecordParser parser = new RecordParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value);String phone = parser.getPhone();int bloodPressure = parser.getBloodPressure();if(bloodPressure > 150) {context.write(new Text(phone), new Text(bloodPressure + ""));}}}public static class HbaseReducerextends TableReducer<Text, Text, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {String value = values.iterator().next().toString();Put putRow = new Put(key.getBytes());putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());context.write(new ImmutableBytesWritable(key.getBytes()), putRow);}}public static void main(String[] args) throws Exception{        Configuration conf = HBaseConfiguration.create();        conf.set("hbase.zookeeper.quorum.", "localhost");  //千万别忘记配置Job job = new Job(conf, "count");job.setJarByClass(HbaseInsertData.class);job.setMapperClass(HbaseMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);Path in = new Path("hdfs://localhost:9000/input");FileInputFormat.addInputPath(job, in);TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);System.exit(job.waitForCompletion(true) ? 0 : 1);}}

?

解析的classRecordParser

import org.apache.hadoop.io.Text;public class RecordParser {private String id;private String phone;private int bloodPressure;public void parse(String record) {String[] logs = record.split(",");id = logs[1];phone = logs[3];bloodPressure = Integer.parseInt(logs[4]);}public void parse(Text record) {this.parse(record.toString());}public String getId() {return id;}public String getPhone() {return phone;}public int getBloodPressure() {return bloodPressure;}}
?

热点排行