首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

hadoop pig入门小结

2013-02-24 
hadoop pig入门总结LOAD data [USING function] [AS schema]? ? ? ?例如:? ? ??load LOAD sql://{SE

hadoop pig入门总结
LOAD 'data' [USING function] [AS schema];

? ? ? ?例如:

? ? ??load = LOAD 'sql://{SELECT MONTH_ID,DAY_ID,PROV_ID FROM zb_d_bidwmb05009_010}' ? ?USING com.bonc.dataplatform.bbdp.geniuspig.VerticaLoader('oracle','192.168.6.5','dev','1522','vbap','vbap','1') AS (MONTH_ID:chararray,DAY_ID:chararray,PROV_ID:chararray);

?

Table = load ‘url’ as (id,name…..);??? //table和load之间除了等号外 还必须有个空格 不然会出错,url一定要带引号,且只能是单引号。

?

2:filter

? ? ? ?alias = FILTER alias BY expression;

?????? Table = filter Table1 by + A; //A可以是 id > 10;not name matches ‘’,is not null 等,可以用and ?和or连接各条件

? ? ? ?例如:

? ? ? ?filter = filter load20 by ( MONTH_ID == '1210' and ?DAY_ID == '18' and ?PROV_ID == '010' );

? ? ? ?

?

3:group

alias = GROUP alias { ALL | BY expression} [, alias ALL | BY expression …] [USING 'collected' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

? ? ? ? ? pig的分组,不仅是数据上的分组,在数据的schema形式上也进行分组为groupcolumn:bag

???????? Table3 = group Table2 by id;也可以Table3 = group Table2 by (id,name);括号必须加

???????? 可以使用ALL实现对所有字段的分组

?

4:foreach

alias = FOREACH alias GENERATE expression [AS schema] [expression [AS schema]….];

?

alias = FOREACH nested_alias {

alias = {nested_op | nested_exp}; [{alias = {nested_op | nested_exp}; …]

GENERATE expression [AS schema] [expression [AS schema]….]

};

?

一般跟generate一块使用

???????? Table = foreach Table generate (id,name);括号可加可不加。

avg = foreach Table generate group, AVG(age);? MAX ,MIN..

?

在进行数据过滤时,建议尽早使用foreach generate将多余的数据过滤掉,减少数据交换

?

5:join

Inner? join Syntax

alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];

Outer join Syntax

alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];

?

?????join/left join / right join

daily = load 'A' as (id,name, sex);

divs? = load 'B' as (id,name, sex);

?

join

jnd?? = join daily by (id, name), divs by (id, name); ? ? ??

?

left join

jnd?? = join daily by (id, name) left outer, divs by (id, name);

也可以同时多个变量,但只用于inner join

A = load 'input1' as (x, y);

B = load 'input2' as (u, v);

C = load 'input3' as (e, f);

alpha = join A by x, B by u, C by e;

?

6: union

alias = UNION [ONSCHEMA] alias, alias [, alias …];

?

union 相当与sql中的union,但与sql不通的是pig中的union可以针对两个不同模式的变量:如果两个变量模式相同,那么union后的变量模式与 变量的模式一样;如果一个变量的模式可以由另一各变量的模式强制类型转换,那么union后的变量模式与转换后的变量模式相同;否则,union后的变量 没有模式。

?

A = load 'input1' as (x:int, y:float);

B = load 'input2' as (x:int, y:float);

C = union A, B;

describe C;

?

C: {x: int,y: float}

?

A = load 'input1' as (x:double, y:float);

B = load 'input2' as (x:int, y:double);

C = union A, B;

describe C;

C: {x: double,y: double}

?

A = load 'input1' as (x:int, y:float);

B = load 'input2' as (x:int, y:chararray);

C = union A, B;

describe C;

Schema for C unknown.

?

注意:在pig 1.0中 执行不了最后一种union。

?

如果需要对两个具有不通列名的变量union的话,可以使用onschema关键字

A = load 'input1' as (w: chararray, x:int, y:float);

B = load 'input2' as (x:int, y:double, z:chararray);

C = union onschema A, B;

describe C;

C: {w: chararray,x: int,y: double,z: chararray}

?

join和union之后alias的别名会变

?

7:Dump

???? dump alias

用于在屏幕上显示数据。

?

8:Order by

alias = ORDER alias BY { * [ASC|DESC] | field_alias [ASC|DESC] [, field_alias [ASC|DESC] …] } [PARALLEL n];

???????? A = order Table by id desc;

?

9:distinct

???????? A = distinct alias;

?

10:limit

???????? A = limit alias 10;

?

11:sample

SAMPLE alias size;

?

随机抽取指定比例(0到1)的数据。

some = sample divs 0.1;

?

13:cross

alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

?

将多个数据集中的数据按照字段名进行同值组合,形成笛卡尔积。

--cross.pig

daily = load 'NYSE_daily' as (exchange:chararray, symbol:chararray,date:chararray, open:float, high:float, low:float,

close:float, volume:int, adj_close:float);

divs = load 'NYSE_dividends' as (exchange:chararray, symbol:chararray,date:chararray, dividends:float);

tonsodata = cross daily, divs parallel 10;

?

?

15:split

Syntax

SPLIT alias INTO alias IF expression, alias IF expression [, alias IF expression …] [, alias OTHERWISE];

?

A = LOAD 'data' AS (f1:int,f2:int,f3:int);

DUMP A;

(1,2,3)

(4,5,6)

(7,8,9)

SPLIT A INTO X IF f1<7, Y IF f2==5, Z IF (f3<6 OR f3>6);

?

DUMP X;

(1,2,3)

(4,5,6)

?

DUMP Y;

(4,5,6)

?

DUMP Z;

(1,2,3)

(7,8,9)

?

16:store

???????? Store? … into … Using…

?

?

pig在别名维护上:

1、join

如e =?join?d?by?name,b?by?name;

? ? g =?foreach?e?generate?$0?as?one:chararray, $1?as?two:int, $2?as? ? ? three:chararray,$3?asfour:int;

? ? 他生成的schemal:

?

? ? ? ? e: {d::name: chararray,d::position: int,b::name: chararray,b::age: int}

?

g: {one: chararray,two: int,three: chararray,four: int}

2、group

? ?B = GROUP A BY age;

?

?

pig udf自定义

pig支持嵌入user defined function,一个简单的udf 继承于evalFunc,通常用在filter,foreach中

Java代码?
  1. public?class?MyUDF?extends?EvalFunc<String>?{??
  2. ??
  3. ????@Override??
  4. ????public?String?exec(Tuple?input)?throws?IOException?{??
  5. ????????if(input?==?null?||?input.size()?==0)??
  6. ????????????return?null;??
  7. ????????try?{??
  8. ????????????String?val?=?(String)?input.get(0);??
  9. ????????????return?new?StringBuffer(val).append("?pig").toString();??
  10. ????????}?catch?(Exception?e)?{??
  11. ????????????throw?new?IOException(e.getMessage());??
  12. ????????}??
  13. ????}??
  14. ??
  15. }??

?

pig支持udf in loader and store

udf loader 需要继承于LoadFunc

udf storer 需要继承于StoreFunc

这类似于hadoop中写inputformat和outputformat

其中vertica就是写了一个DB版本的

?

这里贴一个简单的loader的例子:

Java代码?
  1. public?class?MyLoader?extends?LoadFunc{??
  2. ??
  3. ????protected?RecordReader?recordReader?=?null;??
  4. ??????
  5. ????private?PreparedStatement?ps;??
  6. ????private?Connection?conn;??
  7. ????private?final?String?jdbcURL;??
  8. ????private?final?String?user;??
  9. ????private?final?String?pwd;??
  10. ????private?final?String?querySql;??
  11. ????private?ResultSet?rs;??
  12. ??????
  13. ????public?MyLoader(String?driver,String?jdbcURL,String?user,String?pwd,String?querySql){??
  14. ????????try?{??
  15. ????????????Class.forName(driver);??
  16. ????????}?catch?(Exception?e)?{??
  17. ????????????//?TODO:?handle?exception??
  18. ????????}??
  19. ????????this.jdbcURL?=?jdbcURL;??
  20. ????????this.user?=?user;??
  21. ????????this.pwd?=?pwd;??
  22. ????????this.querySql?=?querySql;??
  23. ????}??
  24. ??????
  25. ????@Override??
  26. ????public?InputFormat?getInputFormat()?throws?IOException?{??
  27. ????????return?new?PigTextInputFormat();??
  28. ????}??
  29. ??
  30. ????@Override??
  31. ????public?Tuple?getNext()?throws?IOException?{??
  32. ????????//?TODO?重要的读取过程??
  33. ????????Text?val?=?null;??
  34. ????????boolean?next?=?false;??
  35. ????????try?{??
  36. ????????????next?=?rs.next();??
  37. ????????}?catch?(Exception?e)?{??
  38. ????????????//?TODO:?handle?exception??
  39. ????????}??
  40. ????????if(!next)??
  41. ????????????return?null;??
  42. ????????ResultSetMetaData?rsmd;??
  43. ????????try?{??
  44. //??????????rsmd?=?result??
  45. ????????}?catch?(Exception?e)?{??
  46. ????????????//?TODO:?handle?exception??
  47. ????????}??
  48. ??????????
  49. ????????return?null;??
  50. ????}??
  51. ??
  52. ????@Override??
  53. ????public?void?prepareToRead(RecordReader?arg0,?PigSplit?arg1)??
  54. ????????????throws?IOException?{??
  55. ????????this.recordReader?=?arg0;??
  56. ????}??
  57. ??
  58. ????@Override??
  59. ????public?void?setLocation(String?arg0,?Job?arg1)?throws?IOException?{??
  60. ????????//no?idea??
  61. ????}??
  62. ??????
  63. ????public?ResourceSchema?getSchema(String?location,Job?job)?throws?IOException{??
  64. ????????Configuration?conf?=?job.getConfiguration();??
  65. ????????Schema?schema?=?new?Schema();??
  66. ????????try?{??
  67. ????????????//TODO:reader?from?database?table??
  68. //??????????Connection?conn?=?DriverManager.getConnection(this.jdbcURL,?this.user,?this.pwd);??
  69. ????????????FieldSchema?fieldName?=?new?FieldSchema("name",?DataType.CHARARRAY);??
  70. ????????????FieldSchema?fieldPosition?=?new?FieldSchema("position",?DataType.INTEGER);??
  71. ????????????schema.add(fieldName);??
  72. ????????????schema.add(fieldPosition);??
  73. ????????}?catch?(Exception?e)?{??
  74. ????????????//TODO?log?exception??
  75. ????????}??
  76. ??????????
  77. ????????return?null;??
  78. ????}??
  79. ??????
  80. ????public?void?prepareToRead(){??
  81. ??????????
  82. ????}??
  83. ??
  84. }??

?其中getNext方法就是如何处理reader读取出的数据

? ? ? ? getSchema可以固定读取数据的schema

? ? ? ? setLocation可以处理输入的数据源

? ? ? ??prepareToRead是读取数据之前,可以在此做标识,等等

? ? ? ??

?

pig 衍生

1.penny:

1. Penny的描述

Penny是pig的贡献项目,是pig的调试和监控工具,而且支持根据API自定义penny的监视器和协作器,已实现不同的功能;

2. Penny的总架构

Penny将监视器插入到pig的工作操作中,主要用于监视pig数据流的变化,监视器可以调用协作器,完成各种功能。

3. Penny的总类图关系

ParsePigScript负责根据用户监视器生成新计划newPlan,在ToolsPigServer中根据以前的脚本执行新计划。在执行新计划时,当监视器监视对象数据发生变化,出发监视器,运行自定义的业务,也可以将数据流变化传回协作器里处理,总类图如下:?

4. Penny的使用

Penny的使用需要自定义两个类,一个类继承于监视器基类MonitorAgent,另一个继承于协作器基类Coordinator。然后根据上边类图,就可以使用PennyServer和ParsePigScript进行监控和调试

?5.在pig中就可以找到penny这个贡献的源码

?

Vertica:

? ?vertica是pig loader和storer的udf

? ?附件里是vertica,来自github,和vertica的介绍使用文档

? ?贴一篇将vertica的帖子?http://blackproof.iteye.com/blog/1791995

?

? 推荐书籍

? ? programming pig

?

? 推荐网址

? ?http://pig.apache.org/docs/r0.10.0/basic.html 官网

?

? pig pen开发工具,这个我现在玩得还不熟,就不介绍了,有兴趣的可以去搜搜玩玩

?

我在工作中pig的使用,主要是数据的ETL,所以比较适合。在选择pig hive还是其他非hadoop架构,如redis,这还是一个需要继续尝试探索的问题。

热点排行