Hive常⽤算⼦实现原理简述--MapReduce版
长治市政府
0. 引⾔
Hive中的常⽤算⼦包括distinct、join、group by、order by、distribute by、sort by、count等,这些操作符在SQL中使⽤起来很⽅便,能快速达到我们想要的效果,但是这些算⼦在底层是怎么实现的呢?
order by很容易想到执⾏原理,在⼀个reduce中将所有记录按值排序即可。因此order by在数据量⼤的情况下执⾏时间⾮常长,容易out of memory,⾮特殊业务需求⼀般不使⽤。distribute by也⽐较明显,根据hash值将distribute的值分发到不同的reduce。sort by是⼩号的order by,只负责将本reducer中的值排序,达到局部有序的效果。sort by和distribute by配合使⽤风味更佳,⼆者可以合并简写为cluster by。count则更加明晰,在combiner或reducer处按相同键累加值就能得到。
⽐较复杂的是distinct、join、group by,本⽂重点讨论这三个算⼦在MapReduce引擎中的⼤致实现原理。班门弄斧,抛砖引⽟。
绚丽多彩拼音
1. group by
map阶段,将group by后的字段组合作为key,如果group by单字段那么key就⼀个。将group by之后要进⾏的聚合操作字段作为值,如要进⾏count,则value是1;如要sum另⼀个字段,则value就是该字段。
shuffle阶段,按照key的不同分发到不同的reducer。注意此时可能因为key分布不均匀⽽出现数据倾斜的问题。
reduce阶段,将相同key的值累加或作其他需要的聚合操作,得到结果。
实例如下图,对应语句是 lect rank, isonline, count(*) from city group by rank, isonline;
group by过程⽰意图
京东卡怎么买
如果group by出现数据倾斜,除去替换key为随机数、提前挑出⼤数量级key值等通⽤调优⽅法,适⽤于group by的特殊⽅法有以下⼏种:
(1)t hive.map.aggr=true,即开启map端的combiner,减少传到reducer的数据量,同时需设置参数
山楂禁忌
(2)设置duce.tasks为较⼤数量,降低每个reducer处理的数据量。
(3)upby.skewindata=true,该参数可⾃动进⾏负载均衡。⽣成的查询计划会有两个 MR Job。第⼀个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从⽽达到负载均衡的⽬的;第⼆个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同⼀个 Reduce中),最后完成最终的聚合操作。
2. join
Hive中有两种join⽅式:map join和common join
消失的小学2.1 common join
如果不显式指定map side join,或者没有达到触发⾃动map join的条件,那么会进⾏reduce端的join,即common join,这种join包含map、shuffle、reduce三个步骤。
(1)Map阶段
读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key。Map输出的value为join 之后所关⼼的(lect或者where中需要⽤到的)列;同时在value中还会包含表的Tag信息,⽤于标明此value对应哪个表。然后按照key进⾏排序。
(2)Shuffle阶段
根据key的值进⾏hash,并将key/value按照hash值推送⾄不同的reduce中,这样确保两个表中相同的key位于同⼀个reduce中
(3)Reduce阶段
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
以下⾯的SQL为例,可⽤下图所⽰过程⼤致表达其join原理。
SELECT u.name, o.orderid FROM ur u JOIN order o ON u.uid = o.uid;
关联字段是uid,因此以uid为map阶段的输出key,value为选取的字段name和标记源表的tag。shuffle阶段将相同key的键值对发到⼀
起,reduce阶段将不同源表、同⼀key值的记录拼接起来,可能存在⼀对多的情况。
common join原理⽰意
2.2 map join
如果指定使⽤map join的⽅式,或者join的其中⼀张表⼩于某个体积(默认25MB),则会使⽤map join来执⾏。具体⼩表有多⼩,由参hive.mapjoin.smalltable.filesize来决定。
数hive.mapjoin.smalltable.filesize
Hive0.7之前,需要使⽤hint提⽰ /*+ mapjoin(table) */才会执⾏MapJoin,否则执⾏Common Join,但在0.7版本之后,默认⾃动会转换Map vert.join来控制,默认为true。护理礼仪
Join,由参数vert.join
yarn会启动⼀个Local Task(在客户端本地执⾏的Task)--Task A,负责扫描⼩表b的数据,将其转换成⼀个HashTable的数据结构,并写⼊本地的⽂件中,之后将该⽂件加载到DistributeCache中。
接下来是Task B,该任务是⼀个没有Reduce的MR,启动MapTasks扫描⼤表a,在Map阶段,根据a的每⼀条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果⽂件,有多少个Map Task,就有多少个结果⽂件。
map join原理⽰意
3. distinct
distinct⼀般和group by同时出现。
3.1 distinct单字段
认识时间教案
当distinct⼀个字段时,将group by的字段和distinct的字段组合在⼀起作为map输出的key,value设置为1,同时将group by的字段定为分区键,这可以确保相同group by字段的记录都分到同⼀个reducer,并且map的输⼊天然就是按照组合key排好序的。根据分区键将记录分发到reduce端后,按顺序取出组合键中的distinct字段,这时distinct字段也是排好序的。依次遍历distinct字段,每找到⼀个不同值,计数器就⾃增1,即可得到count distinct结果。例如下⾯的SQL语句,过程可以下图⽰意。
distinct uid) as num from order group by dealid;
lect dealid,count(distinct
distinct 单字段
3.2 distinct多字段
家庭介绍我暂时没有理解这是怎么实现的,别⼈写的也没有看明⽩。有善良的学富五车的⼤佬指点⼀下吗?