hadoop pig入门总结
LOAD 'data' [USING function] [AS schema];
? ? ? ?例如:
? ? ??load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}' ? ?USING com.bonc.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);
?
Table = load ‘url’ as (id,name…..);??? //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。
?
?????? Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and ?和or连接各条件
? ? ? ?例如:
? ? ? ?filter = filter load20 by ( MONTH_ID == '1210' and ?DAY_ID == '18' and ?PROV_ID == '010' );
? ? ? ?
?
alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
? ? ? ? ? pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag
???????? Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加
???????? 可以使用ALL实现对所有字段的分组
?
alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];
?
alias = FOREACH nested_alias {
alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]
GENERATE expression [AS schema] [expression [AS schema]….]
};
?
一般跟generate一块使用
???????? Table = foreach Table generate (id,name);括号可加可不加。
avg = foreach Table generate group, AVG(age);? MAX ,MIN..
?
在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换
?
Inner? join Syntax
alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];
Outer join Syntax
alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];
?
?????join/left join / right join
daily = load 'A' as (id,name, sex);
divs? = load 'B' as (id,name, sex);
?
join
jnd?? = join daily by (id, name), divs by (id, name); ? ? ??
?
left join
jnd?? = join daily by (id, name) left outer, divs by (id, name);
也可以同时多个变量,但只用于inner join
A = load 'input1' as (x, y);
B = load 'input2' as (u, v);
C = load 'input3' as (e, f);
alpha = join A by x, B by u, C by e;
?
alias = UNION [ONSCHEMA] alias, alias [, alias …];
?
union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。
?
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:float);
C = union A, B;
describe C;
?
C: {x: int,y: float}
?
A = load 'input1' as (x:double, y:float);
B = load 'input2' as (x:int, y:double);
C = union A, B;
describe C;
C: {x: double,y: double}
?
A = load 'input1' as (x:int, y:float);
B = load 'input2' as (x:int, y:chararray);
C = union A, B;
describe C;
Schema for C unknown.
?
注意:在pig 1.0中 执行不了最后一种union。
?
如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字
A = load 'input1' as (w: chararray, x:int, y:float);
B = load 'input2' as (x:int, y:double, z:chararray);
C = union onschema A, B;
describe C;
C: {w: chararray,x: int,y: double,z: chararray}
?
join和union之后alias的别名会变
?
???? dump alias
用于在屏幕上显示数据。
?
alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];
???????? A = order Table by id desc;
?
???????? A = distinct alias;
?
???????? A = limit alias 10;
?
SAMPLE alias size;
?
随机抽取指定比例(0到1)的数据。
some = sample divs 0.1;
?
alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];
?
将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。
--cross.pig
daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,
close:float, volume:int, adj_close:float);
divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);
tonsodata = cross daily, divs parallel 10;
?
?
Syntax
SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];
?
A = LOAD 'data' AS (f1:int,f2:int,f3:int);
DUMP A;
(1,2,3)
(4,5,6)
(7,8,9)
SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);
?
DUMP X;
(1,2,3)
(4,5,6)
?
DUMP Y;
(4,5,6)
?
DUMP Z;
(1,2,3)
(7,8,9)
?
???????? Store? … into … Using…
?
?
pig在别名维护上:
1、join
如e =?join?d?by?name,b?by?name;
? ? g =?foreach?e?generate?$0?as?one:chararray, $1?as?two:int, $2?as? ? ? three:chararray,$3?asfour:int;
? ? 他生成的schemal:
?
? ? ? ? e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}
?
g: {one: chararray,two: int,three: chararray,four: int}
2、group
? ?B = GROUP A BY age;
?
?pig udf自定义
pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中
Java代码?
- public?class?MyUDF?extends?EvalFunc<String>?{??
- ??
- ????@Override??
- ????public?String?exec(Tuple?input)?throws?IOException?{??
- ????????if(input?==?null?||?input.size()?==0)??
- ????????????return?null;??
- ????????try?{??
- ????????????String?val?=?(String)?input.get(0);??
- ????????????return?new?StringBuffer(val).append("?pig").toString();??
- ????????}?catch?(Exception?e)?{??
- ????????????throw?new?IOException(e.getMessage());??
- ????????}??
- ????}??
- ??
- }??
?
pig支持udf in loader and store
udf loader 需要继承于LoadFunc
udf storer 需要继承于StoreFunc
这类似于hadoop中写inputformat和outputformat
其中vertica就是写了一个DB版本的
?
这里贴一个简单的loader的例子:
Java代码?
- public?class?MyLoader?extends?LoadFunc{??
- ??
- ????protected?RecordReader?recordReader?=?null;??
- ??????
- ????private?PreparedStatement?ps;??
- ????private?Connection?conn;??
- ????private?final?String?jdbcURL;??
- ????private?final?String?user;??
- ????private?final?String?pwd;??
- ????private?final?String?querySql;??
- ????private?ResultSet?rs;??
- ??????
- ????public?MyLoader(String?driver,String?jdbcURL,String?user,String?pwd,String?querySql){??
- ????????try?{??
- ????????????Class.forName(driver);??
- ????????}?catch?(Exception?e)?{??
- ????????????//?TODO:?handle?exception??
- ????????}??
- ????????this.jdbcURL?=?jdbcURL;??
- ????????this.user?=?user;??
- ????????this.pwd?=?pwd;??
- ????????this.querySql?=?querySql;??
- ????}??
- ??????
- ????@Override??
- ????public?InputFormat?getInputFormat()?throws?IOException?{??
- ????????return?new?PigTextInputFormat();??
- ????}??
- ??
- ????@Override??
- ????public?Tuple?getNext()?throws?IOException?{??
- ????????//?TODO?重要的读取过程??
- ????????Text?val?=?null;??
- ????????boolean?next?=?false;??
- ????????try?{??
- ????????????next?=?rs.next();??
- ????????}?catch?(Exception?e)?{??
- ????????????//?TODO:?handle?exception??
- ????????}??
- ????????if(!next)??
- ????????????return?null;??
- ????????ResultSetMetaData?rsmd;??
- ????????try?{??
- //??????????rsmd?=?result??
- ????????}?catch?(Exception?e)?{??
- ????????????//?TODO:?handle?exception??
- ????????}??
- ??????????
- ????????return?null;??
- ????}??
- ??
- ????@Override??
- ????public?void?prepareToRead(RecordReader?arg0,?PigSplit?arg1)??
- ????????????throws?IOException?{??
- ????????this.recordReader?=?arg0;??
- ????}??
- ??
- ????@Override??
- ????public?void?setLocation(String?arg0,?Job?arg1)?throws?IOException?{??
- ????????//no?idea??
- ????}??
- ??????
- ????public?ResourceSchema?getSchema(String?location,Job?job)?throws?IOException{??
- ????????Configuration?conf?=?job.getConfiguration();??
- ????????Schema?schema?=?new?Schema();??
- ????????try?{??
- ????????????//TODO:reader?from?database?table??
- //??????????Connection?conn?=?DriverManager.getConnection(this.jdbcURL,?this.user,?this.pwd);??
- ????????????FieldSchema?fieldName?=?new?FieldSchema("name",?DataType.CHARARRAY);??
- ????????????FieldSchema?fieldPosition?=?new?FieldSchema("position",?DataType.INTEGER);??
- ????????????schema.add(fieldName);??
- ????????????schema.add(fieldPosition);??
- ????????}?catch?(Exception?e)?{??
- ????????????//TODO?log?exception??
- ????????}??
- ??????????
- ????????return?null;??
- ????}??
- ??????
- ????public?void?prepareToRead(){??
- ??????????
- ????}??
- ??
- }??
?其中getNext方法就是如何处理reader读取出的数据
? ? ? ? getSchema可以固定读取数据的schema
? ? ? ? setLocation可以处理输入的数据源
? ? ? ??prepareToRead是读取数据之前,可以在此做标识,等等
? ? ? ??
?
pig 衍生
1.penny:
1. Penny的描述
Penny是pig的贡献项目,是pig的调试和监控工具,而且支持根据API自定义penny的监视器和协作器,已实现不同的功能;
2. Penny的总架构
Penny将监视器插入到pig的工作操作中,主要用于监视pig数据流的变化,监视器可以调用协作器,完成各种功能。
3. Penny的总类图关系
ParsePigScript负责根据用户监视器生成新计划newPlan,在ToolsPigServer中根据以前的脚本执行新计划。在执行新计划时,当监视器监视对象数据发生变化,出发监视器,运行自定义的业务,也可以将数据流变化传回协作器里处理,总类图如下:?
4. Penny的使用
Penny的使用需要自定义两个类,一个类继承于监视器基类MonitorAgent,另一个继承于协作器基类Coordinator。然后根据上边类图,就可以使用PennyServer和ParsePigScript进行监控和调试
?5.在pig中就可以找到penny这个贡献的源码
?
Vertica:
? ?vertica是pig loader和storer的udf
? ?附件里是vertica,来自github,和vertica的介绍使用文档
? ?贴一篇将vertica的帖子?http://blackproof.iteye.com/blog/1791995
?
? 推荐书籍
? ? programming pig
?
? 推荐网址
? ?http://pig.apache.org/docs/r0.10.0/basic.html 官网
?
? pig pen开发工具,这个我现在玩得还不熟,就不介绍了,有兴趣的可以去搜搜玩玩
?
我在工作中pig的使用,主要是数据的ETL,所以比较适合。在选择pig hive还是其他非hadoop架构,如redis,这还是一个需要继续尝试探索的问题。