无意间在部门的svn看到以前老员工的Hive优化文档,看看了,写的挺详细的,结合hadoop In action(陆) 一书的内容在这里做个汇总
1,列裁剪
在读数据的时候,只读取查询中需要用到的列,而忽略其他列。例如,对于查询:
1 SELECT a,b FROM T WHERE e < 10 ;
其中,T 包含 5 个列 (a,b,c,d,e),列 c,d 将会被忽略,只会读取a, b, e 列
这个选项默认为真: hive.optimize.cp = true
2,分区裁剪
在查询的过程中减少不必要的分区。例如,对于下列查询:
1 select * 2 from 3 ( select c1, count ( 1 ) 4 fromT 5 group by c1 6 ) subq 7 where subq.prtn = 100 ; 8 9 select * 10 from T1 11 join 12 ( select * 13 from T2 14 ) subq 15 on T1.c1 = subq.c2 16 where subq.prtn = 100 ;
会在子查询中就考虑 subq.prtn = 100 条件,从而减少读入的分区数目。
此选项默认为真: hive.optimize.pruner=true
3,Join 操作
在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边。原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。
下面是从网上看到的,做一下备注,源地址 http://www.cnblogs.com/end/archive/2013/01/15/2861448.html
长期观察hadoop处理数据的过程,有几个显著的特征:
1.不怕数据多,就怕数据倾斜。
2.对jobs数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个jobs,没半小时是跑不完的。map reduce作业初始化的时间是比较长的。
3.对sum,count来说,不存在数据倾斜问题。
4.对count(distinct ),效率较低,数据量一多,准出问题,如果是多count(distinct )效率更低。
优化可以从几个方面着手:
1. 好的模型设计事半功倍。
2. 解决数据倾斜问题。
3. 减少job数。
4. 设置合理的map reduce的task数,能有效提升性能。(比如,10w+级别的计算,用160个reduce,那是相当的浪费,1个足够)。
5. 自己动手写sql解决数据倾斜问题是个不错的选择。set hive.groupby.skewindata=true;这是通用的算法优化,但算法优化总是漠视业务,习惯性提供通用的解决方法。 Etl开发人员更了解业务,更了解数据,所以通过业务逻辑解决倾斜的方法往往更精确,更有效。
6. 对count(distinct)采取漠视的方法,尤其数据大的时候很容易产生倾斜问题,不抱侥幸心理。自己动手,丰衣足食。
7. 对小文件进行合并,是行至有效的提高调度效率的方法,假如我们的作业设置合理的文件数,对云梯的整体调度效率也会产生积极的影响。
8. 优化时把握整体,单个作业最优不如整体最优。
问题1 :如日志中,常会有信息丢失的问题,比如全网日志中的user_id ,如果取其中的user_id 和bmw_users 关联,就会碰到数据倾斜的问题。
方法:解决数据倾斜问题
解决方法1.
User_id为空的不参与关联,例如:
1 Select * 2 From log a 3 Join 4 bmw_users b 5 On a. user_id is not null And a. user_id = b. user_id 6 Union all 7 Select * 8 from log a 9 where a. user_id is null ;
解决方法2 :
1 Select * 2 from log a 3 left outer join bmw_users b 4 on case when a. user_id is null then concat(‘dp_hive’, rand () ) 5 else a. user_id end = b. user_id ;
总结: 2比1效率更好,不但io少了,而且作业数也少了。1方法log读取两次,jobs是2。2方法job数是1 。 这个优化适合无效 id( 比如-99,’’,null 等) 产生的倾斜问题。 把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。因为空值不参与关联,即使分到不同的reduce上,也不影响最终的结果。附上hadoop通用关联的实现方法(关联通过二次排序实现的,关联的列为parition key,关联的列c1和表的tag组成排序的group key,根据parition key分配reduce。同一reduce内根据group key排序)。
问题2 :不同数据类型id 的关联会产生数据倾斜问题。
一张表s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题。s8的日志中有字符串商品id,也有数字的商品id,类型是string的,但商品中的数字id是bigint的。猜测问题的原因是把s8的商品id转成数字id做hash来分配reduce,所以字符串id的s8日志,都到一个reduce上了,解决的方法验证了这个猜测。
方法:把数字类型转换成字符串类型
1 select *
from
( Select * 2 from s8_log a 3 Left outer join 4 r_auction_auctions b 5 On a.auction_id = cast (b.auction_id as string);
6 ) a;
问题3 :利用hive 对UNION ALL 的优化的特性
hive 对union all 优化只局限于非嵌套查询。
比如以下的例子:
1 select * 2 from 3 ( select * from t1 4 Group by c1,c2,c3 5 Union all 6 Select * from t2 7 Group by c1,c2,c3 8 ) t3 9 Group by c1,c2,c3; 10
从业务逻辑上说,子查询内的group by 怎么都看显得多余(功能上的多余,除非有count(distinct)),如果不是因为hive bug或者性能上的考量(曾经出现如果不子查询group by ,数据得不到正确的结果的hive bug)。所以这个hive按经验转换成
1 select * from 2 ( select * from t1 3 Union all 4 Select * from t2 5 ) t3 6 Group by c1,c2,c3;
经过测试,并未出现union all的hive bug,数据是一致的。mr的作业数有3减少到1。
t1相当于一个目录,t2相当于一个目录,那么对map reduce程序来说,t1,t2可以做为map reduce 作业的mutli inputs。那么,这可以通过一个map reduce 来解决这个问题。Hadoop的计算框架,不怕数据多,就怕作业数多。
但如果换成是其他计算平台如oracle,那就不一定了,因为把大的输入拆成两个输入,分别排序汇总后merge(假如两个子排序是并行的话),是有可能性能更优的(比如希尔排序比冒泡排序的性能更优)。
问题4 :比如推广效果表要和商品表关联,效果表中的auction id 列既有商品id, 也有数字id, 和商品表关联得到商品的信息 。那么以下的hive sql性能会比较好
1 Select * from effect a 2 Join 3 ( select auction_id 4 from auctions 5 union all 6 Select auction_string_id as auction_id 7 from auctions 8 ) b 9 On a.auction_id = b.auction_id;
比分别过滤数字id,字符串id然后分别和商品表关联性能要好。
这样写的好处,1个MR作业,商品表只读取一次,推广效果表只读取一次。把这个sql换成MR代码的话,map的时候,把a表的记录打上标签a,商品表记录每读取一条,打上标签b,变成两个<key ,value>对,<b,数字id>,<b,字符串id>。所以商品表的hdfs读只会是一次。
问题5 :先join 生成临时表,在union all 还是写嵌套查询,这是个问题 。比如以下例子:
1 Select * 2 From 3 ( select * 4 From t1 5 Uion all 6 select * 7 From t4 8 Union all 9 Select * 10 From t2 11 Join t3 12 On t2.id = t3.id 13 ) x 14 Group by c1,c2;
这个会有4个jobs。假如先join生成临时表的话t5,然后union all,会变成2个jobs。
1 Insert overwrite table t5 2 Select * 3 From t2 4 Join t3 5 On t2.id = t3.id 6 ; 7 Select * from (t1 union all t4 union all t5) ;
hive 在union all 优化上可以做得更智能(把子查询当做临时表),这样可以减少开发人员的负担。出现这个问题的原因应该是union all 目前的优化只局限于非嵌套查询。如果写MR 程序这一点也不是问题,就是multi inputs 。
问题6 :使用map join 解决数据倾斜的常景下小表关联大表的问题,但如果小表很大,怎么解决 。这个使用的频率非常高,但如果小表很大,大到map join会出现bug或异常,这时就需要特别的处理。云瑞和玉玑提供了非常给力的解决方案。以下例子:
1 Select * from log a 2 Left outer join members b 3 On a.memberid = b.memberid;
Members有600w+的记录,把members分发到所有的map上也是个不小的开销,而且map join不支持这么大的小表。如果用普通的join,又会碰到数据倾斜的问题。
解决方法:
1 Select /* +mapjoin(x) */ * 2 from log a 3 Left outer join 4 ( select /* +mapjoin(c) */ d. * 5 From 6 ( select distinct memberid 7 from log 8 ) c 9 Join 10 members d 11 On c.memberid = d.memberid 12 ) x 13 On a.memberid = b.memberid;
先根据log取所有的memberid,然后mapjoin 关联members取今天有日志的members的信息,然后在和log做mapjoin。
假如,log里memberid有上百万个,这就又回到原来map join问题。所幸,每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等。所以这个方法能解决很多场景下的数据倾斜问题。
问题7 :HIVE 下通用的数据倾斜解决方法,double 被关联的相对较小的表,这个方法在mr 的程序里常用。 还是刚才的那个问题:
1 Select * 2 from log a 3 Left outer join 4 ( select /* +mapjoin(e) */ memberid, number 5 From members d 6 Join num e 7 ) b 8 On a.memberid = b.memberid And mod(a.pvtime, 30 ) + 1 = b. number ;
Num表只有一列number,有30行,是1,30的自然数序列。就是把member表膨胀成30份,然后把log数据根据memberid和pvtime分到不同的reduce里去,这样可以保证每个reduce分配到的数据可以相对均匀。就目前测试来看,使用mapjoin的方案性能稍好。后面的方案适合在map join无法解决问题的情况下。
长远设想,把如下的优化方案做成通用的hive 优化方法
1. 采样log 表,哪些memberid 比较倾斜,得到一个结果表tmp1 。由于对计算框架来说,所有的数据过来,他都是不知道数据分布情况的,所以采样是并不可少的。Stage1
2. 数据的分布符合社会学统计规则,贫富不均。倾斜的key 不会太多,就像一个社会的富人不多,奇特的人不多一样。所以tmp1 记录数会很少。把tmp1 和members 做map join 生成tmp2, 把tmp2 读到distribute file cache 。这是一个map 过程。Stage2
3. map 读入members 和log ,假如记录来自log, 则检查memberid 是否在tmp2 里,如果是,输出到本地文件a, 否则生成<memberid,value> 的key,value 对,假如记录来自member, 生成<memberid,value> 的key,value 对,进入reduce 阶段。Stage3.
4. 最终把a 文件,把Stage3 reduce 阶段输出的文件合并起写到hdfs 。
这个方法在hadoop 里应该是能实现的。Stage2 是一个map 过程,可以和stage3 的map 过程可以合并成一个map 过程。
这个方案目标就是:倾斜的数据用mapjoin, 不倾斜的数据用普通的join ,最终合并得到完整的结果。用hive sql 写的话,sql 会变得很多段,而且log 表会有多次读。倾斜的key 始终是很少的,这个在绝大部分的业务背景下适用。那是否可以作为hive 针对数据倾斜join 时候的通用算法呢?
问题8 :多粒度( 平级的)uv 的计算优化 ,比如要计算店铺的uv。还有要计算页面的uv,pvip.
方案1:
1 Select shopid, count ( distinct uid) 2 From log 3 group by shopid; 4 Select pageid, count ( distinct uid) 5 From log 6 group by pageid;
由于存在数据倾斜问题,这个结果的运行时间是非常长的。
方案二:
1 From log 2 Insert overwrite table t1 (type = ’ 1 ’) 3 Select shopid 4 Group by shopid ,acookie 5 Insert overwrite table t1 (type = ’ 2 ’) 6 Group by pageid,acookie;
1 -- 店铺uv: 2 3 Select shopid, sum ( 1 ) 4 From t1 5 Where type = ’ 1 ’ 6 Group by shopid ; 7 8 -- 页面uv: 9 10 Select pageid, sum ( 1 ) 11 From t1 12 Where type = ’ 1 ’ 13 Group by pageid ;
这里使用了multi insert 的方法,有效减少了hdfs 读,但multi insert 会增加hdfs 写,多一次额外的map 阶段的hdfs 写。使用这个方法,可以顺利的产出结果。
方案三:
1 Insert into t1 2 Select type,type_name,’’ as uid 3 From 4 ( Select ‘page’ as type,Pageid as type_name,Uid 5 From log 6 Union all 7 Select ‘shop’ as type,Shopid as type_name,Uid 8 From log 9 ) y 10 Group by type,type_name,uid; 11 12 Insert into t2 13 Select type,type_name, sum ( 1 ) 14 From t1 15 Group by type,type_name; 16 From t2 17 Insert into t3 18 Select type,type_name,uv 19 Where type = ’page’ 20 Select type,type_name,uv 21 Where type = ’shop’ ;
最终得到两个结果表t3,页面uv表,t4,店铺结果表。从io上来说,log一次读。但比方案2少次hdfs写(multi insert有时会增加额外的map阶段hdfs写)。作业数减少1个到3,有reduce的作业数由4减少到2,第三步是一个小表的map过程,分下表,计算资源消耗少。但方案2每个都是大规模的去重汇总计算。
这个优化的主要思路是,map reduce 作业初始化话的时间是比较长,既然起来了,让他多干点活 ,顺便把页面按uid去重的活也干了,省下log的一次读和作业的初始化时间,省下网络shuffle的io,但增加了本地磁盘读写。效率提升较多。
这个方案适合平级的不需要逐级向上汇总的多粒度uv 计算,粒度越多,节省资源越多,比较通用。
问题9 :多粒度,逐层向上汇总的uv 结算。 比如4个维度,a,b,c,d,分别计算a,b,c,d,uv;
a,b,c,uv;a,b,uv;a;uv,total uv4个结果表。这可以用问题8的方案二,这里由于uv场景的特殊性,多粒度,逐层向上汇总,就可以使用一次排序,所有uv计算受益的计算方法。
案例: 目前mm_log日志一天有25亿+的pv数,要从mm日志中计算uv,与ipuv,一共计算
三个粒度的结果表
(memberid,siteid,adzoneid,province,uv,ipuv) R_TABLE_4
(memberid,siteid,adzoneid,uv,ipuv) R_TABLE_3
(memberid,siteid,uv,ipuv) R_TABLE_2
第一步:按memberid,siteid,adzoneid,province, 使用group 去重 ,产生临时表,对cookie,ip
打上标签放一起,一起去重,临时表叫T_4;
1 Select memberid,siteid,adzoneid,province,type, user 2 From 3 ( Select memberid,siteid,adzoneid,province,‘a’ type ,cookie as user 4 from mm_log 5 where ds = 20101205 6 Union all 7 Select memberid,siteid,adzoneid,province,‘i’ type ,ip as user 8 from mm_log 9 where ds = 20101205 10 ) x 11 group by memberid,siteid,adzoneid,province,type, user ;
第二步:排名 ,产生表T_4_NUM.Hadoop最强大和核心能力就是parition 和 sort.按type,acookie分组,
Type,acookie,memberid,siteid,adzoneid,province排名。
1 Select * ,row_number(type, user ,memberid,siteid,adzoneid ) as adzone_num , row_number(type, user ,memberid,siteid ) as site_num
2 ,row_number(type, user ,memberid ) as member_num 3 ,row_number(type, user ) as total_num 4 from 5 ( select * 6 from T_4 7 distribute by type, user sort by type, user , memberid,siteid,adzoneid 8 ) x;
这样就可以得到不同层次粒度上user的排名,相同的user id在不同的粒度层次上,排名等于1的记录只有1条。取排名等于1的做sum,效果相当于Group by user去重后做sum操作。
第三步:不同粒度uv统计,先从最细粒度的开始统计,产生结果表R_TABLE_4,这时,结果集只有10w的级别。
如统计memberid,siteid,adzoneid,provinceid粒度的uv使用的方法就是
Select memberid,siteid,adzoneid, provinceid,
sum(case when type =’a’ then cast(1) as bigint end ) as province_uv ,
sum(case when type =’i’ then cast(1) as bigint end ) as province_ip ,
sum(case when adzone_num =1 and type =’a’ then cast(1) as bigint end ) as adzone_uv ,
sum(case when adzone_num =1 and type =’i’ then cast(1) as bigint end ) as adzone_ip ,
sum(case when site_num =1 and type =’a’ then cast(1) as bigint end ) as site_uv ,
sum(case when site_num =1 and type =’i’ then cast(1) as bigint end ) as site_ip ,
sum(case when member_num =1 and type =’a’ then cast(1) as bigint end ) as member_uv ,
sum(case when member_num =1 and type =’i’ then cast(1) as bigint end ) as member_ip ,
sum(case when total_num =1 and type =’a’ then cast(1) as bigint end ) as total_uv ,
sum(case when total_num =1 and type =’i’ then cast(1) as bigint end ) as total_ip ,
from T_4_NUM
group by memberid,siteid,adzoneid, provinceid ;
广告位粒度的uv的话,从R_TABLE_4统计,这是源表做10w级别的统计
Select memberid,siteid,adzoneid,sum(adzone_uv),sum(adzone_ip)
From R_TABLE_4
Group by memberid,siteid,adzoneid;
memberid,siteid的uv计算 ,
memberid的uv计算,
total uv 的计算也都从R_TABLE_4汇总。