首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

Sqoop源码分析(4) Sqoop中通过hadoop mapreduce从关系型数据库import数据分析

2012-06-26 
Sqoop源码分析(四) Sqoop中通过hadoop mapreduce从关系型数据库import数据分析??????本博客属原创文章,转

Sqoop源码分析(四) Sqoop中通过hadoop mapreduce从关系型数据库import数据分析

??????本博客属原创文章,转载请务必注明出处:http://guoyunsky.iteye.com/blogs/1213966/?????????

? ? ???欢迎加入Hadoop超级群:?180941958

?????????? Sqoop中一大亮点就是可以通过hadoop的mapreduce从关系型数据库中导入数据到HDFS,如此可以加快导入时间.一直想了解MapReduce,所以也仔细的阅读了下相关代码,整理成这篇博客.

??????????? 一.原理:

? ? ? ? ? ? ?Sqoop在import时,需要制定split-by参数.Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中.每个map中再处理数据库中获取的一行一行的值,写入到HDFS中.同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。比如select max(split_by),min(split-by) from得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers为2的话,则会分成两个区域(1,500)和(501-100),同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by>=1 and split-by<500和select XXX from table where split-by>=501 and split-by<=1000.最后每个map各自获取各自SQL中的数据进行导入工作。

??????

???????二.mapreduce job所需要的各种参数在Sqoop中的实现

?????????? 1)InputFormatClass
??????????????com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat
?????????? 2)OutputFormatClass
?????????????? 1)TextFile
?????????????????? com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat
?????????????? 2)SequenceFile
???????????????????org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat?
?????????????? 3)AvroDataFile
?????????????????? com.cloudera.sqoop.mapreduce.AvroOutputFormat
????????? 3)Mapper
???????????? 1)TextFile
???????????????? com.cloudera.sqoop.mapreduce.TextImportMapper
???????????? 2)SequenceFile
???????????????? com.cloudera.sqoop.mapreduce.SequenceFileImportMapper
?????????? ?3)AvroDataFile
???????????????? com.cloudera.sqoop.mapreduce.AvroImportMapper
???????? 4)taskNumbers
??????????? 1)mapred.map.tasks (对应num-mappers参数)
????????????2)job.setNumReduceTasks(0);


???????这里以我命令行:import --connect jdbc:mysql://localhost/sqoop_datas? --username root --password 123456 --query "select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2? WHERE $CONDITIONS" --target-dir /tmp/sqoop/foo2 -split-by sqoop_1.id?? --hadoop-home=/home/guoyun/Downloads/hadoop-0.20.2-CDH3B4? --num-mappers 2

?????? 注:红色部分参数,后接根据我的命令衍生的参数值

????? 1)设置Input

?????? DataDrivenImportJob.configureInputFormat(Job job, String tableName,String tableClassName, String splitByCol)
?????????? a)DBConfiguration.configureDB(Configuration conf, String driverClass,
?????????? String dbUrl, String userName, String passwd, Integer fetchSize)

??????????????? 1).mapreduce.jdbc.driver.class com.mysql.jdbc.Driver
??????????????? 2).mapreduce.jdbc.url? jdbc:mysql://localhost/sqoop_datas
??????????????? 3).mapreduce.jdbc.username? root
??????????????? 4).mapreduce.jdbc.password? 123456
??????????????? 5).mapreduce.jdbc.fetchsize -2147483648
?????????? ?b)DataDrivenDBInputFormat.setInput(Job job,Class<? extends DBWritable> inputClass,
?????????? String inputQuery, String inputBoundingQuery)
?????????????? 1)job.setInputFormatClass(DBInputFormat.class);
?????????????? 2)mapred.jdbc.input.bounding.query SELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2? WHERE? (1 = 1) ) AS t1
?????????????? 3)job.setInputFormatClass(DataDrivenDBInputFormat.class);
?????????????? 4)mapreduce.jdbc.input.orderby sqoop_1.id
?????????? c)mapreduce.jdbc.input.class QueryResult
?????????? d)sqoop.inline.lob.length.max 16777216
??????? ?3)job.setInputFormatClass(inputFormatClass); class com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat


?? 2)设置Output?

??? ImportJobBase.configureOutputFormat(Job job, String tableName,String tableClassName)
???????????? a)job.setOutputFormatClass(getOutputFormatClass());
???????????? b)FileOutputFormat.setOutputCompressorClass(job, codecClass);
???????????? c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
???????????? d)FileOutputFormat.setOutputPath(job, outputPath);


????3)设置Map

?????? DataDrivenImportJob.configureMapper(Job job, String tableName,String tableClassName)
????????????? a) job.setOutputKeyClass(Text.class);
????????????? b)job.setOutputValueClass(NullWritable.class);
????????????? c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);


??? 4)设置task number

?????? JobBase.configureNumTasks(Job job)
????????????? mapred.map.tasks 4
????????????? job.setNumReduceTasks(0);


?????? 三。大概流程

????????? 1.读取要导入数据的表结构,生成运行类,默认是QueryResult,打成jar包,然后提交给Hadoop

????????? 2.设置好job,主要也就是设置好以上二中的各个参数

????????? 3.这里就由Hadoop来执行MapReduce来执行Import命令了,

???????????? 1)首先要对数据进行切分,也就是DataSplit

???????????????? DataDrivenDBInputFormat.getSplits(JobContext job)

???????????? 2)切分好范围后,写入范围,以便读取

??????????????? DataDrivenDBInputFormat.write(DataOutput output)?

??????????????? 这里是lowerBoundQuery and? upperBoundQuery

???????????? 3)读取以上2)写入的范围

??????????????? DataDrivenDBInputFormat.readFields(DataInput input)

???????????? 4)然后创建RecordReader从数据库中读取数据

??????????????? DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)

???????????? 5)创建Map

??????????????? TextImportMapper.setup(Context context)

???????????? 6)RecordReader一行一行从关系型数据库中读取数据,设置好Map的Key和Value,交给Map

??????????????? DBRecordReader.nextKeyValue()?

???????????? 7)运行map

???????????????? TextImportMapper.map(LongWritable key, SqoopRecord val, Context context)

????????????????? 最后生成的Key是行数据,由QueryResult生成,Value是NullWritable.get()


? 四.总结

?????? 通过这些,我大概了解了MapReduce运行流程.但对于Sqoop这种切分方式感觉还是有很大的问题.比如这里根据ID范围来切分,如此切分出来的数据会很不平均,比如min(split-id)=1,max(split-id)=3000,交给三个map来处理。那么范围是(1-1000),(1001-2000),(2001-3000).而假如1001-2000是没有数据,已经被删除了。那么这个map就什么都不能做。而其他map却累的半死。如此就会拖累job的运行结果。我这里说的范围很小,比如有几十亿条数据交给几百个map去做。map一多,如果任务不均衡就会影响进度。看有没有更好的切分方式?比如取样?如此看来,写好map reduce也不简单!

?

1 楼 SmartMessage 2012-03-28   写的不错 学习了

热点排行