使用MapReduce往Hbase插入数据
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import com.hbase.log.RecordParser;public class HbaseInsertData {public static class HbaseMapper extends Mapper<LongWritable, Text, Text, Text>{RecordParser parser = new RecordParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value);String phone = parser.getPhone();int bloodPressure = parser.getBloodPressure();if(bloodPressure > 150) {context.write(new Text(phone), new Text(bloodPressure + ""));}}}public static class HbaseReducerextends TableReducer<Text, Text, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException {String value = values.iterator().next().toString();Put putRow = new Put(key.getBytes());putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());context.write(new ImmutableBytesWritable(key.getBytes()), putRow);}}public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum.", "localhost"); //千万别忘记配置Job job = new Job(conf, "count");job.setJarByClass(HbaseInsertData.class);job.setMapperClass(HbaseMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);Path in = new Path("hdfs://localhost:9000/input");FileInputFormat.addInputPath(job, in);TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
解析的classRecordParser
import org.apache.hadoop.io.Text;public class RecordParser {private String id;private String phone;private int bloodPressure;public void parse(String record) {String[] logs = record.split(",");id = logs[1];phone = logs[3];bloodPressure = Integer.parseInt(logs[4]);}public void parse(Text record) {this.parse(record.toString());}public String getId() {return id;}public String getPhone() {return phone;}public int getBloodPressure() {return bloodPressure;}}?