数仓(七):全量、增量、缓慢变化维、拉链表
⼀、数据抽取、更新⽅式
解决增量导⼊由于数据修改导致数据重复问题
⽅案⼀:全量更新
⽬标表只保留最新的⼀份,⽐如我们每天⽤sqoop抽取最新的⼀份全量数据到hive
全量表:全量表没有分区,表中的数据是前⼀天的所有数据,⽐如说今天是24号,那么全量表⾥⾯拥有的数据是23号的所有数据,每次往全量表⾥⾯写数据都会覆盖之前的数据,所以全量表不能记录历史的数据情况,只有截⽌到当前最新的、全量的数据。
⽅式:每天drop掉前⼀天的数据,重新抽⼀份最新的。
优点:节省空间,⼀些普通的使⽤也很⽅便,不⽤在选择表的时候加⼀个时间分区什么的。
缺点:没有历史数据,先翻翻旧账只能通过其它⽅式,⽐如从流⽔表⾥⾯抽。
⽅案⼆:快照表,每天保留⼀份全量的切⽚数据
那么要能查到历史数据情况⼜该怎么办呢?这个时候快照表就派上⽤途了,快照表是有时间分区的,每个分区⾥⾯的数据都是分区时间对应的前⼀天的所有全量数据,⽐如说当前数据表有3个分区,24号,25号,26号。其中,24号分区⾥⾯的数据就是从历史到23号的所有数据,25号分区⾥⾯的数据就是从历史到24号的所有数据,以此类推。
⽅式:每天⼀份全量的切⽚
优点:⽐较稳妥,⽽且历史数据也在
缺点:存储空间占有量太⼤,如果每天都保留⼀份全量,那么每次全量中会保存很多不变的信息, 对存储是极⼤的浪费
在数据从源业务系统每天正常抽取和刷新到DW订单历史表之前,需要做⼀次全量的初始化,就是从源订单表中昨天以前的数据全部刷新到DW。
以上⾯的数据为例,⽐如在2015-08-21这天做全量初始化,那么我需要将包括2015-08-20之前的所有的数据都抽取并刷新到DW:
第⼀步,抽取全量数据到ODS:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-20′)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime <= ‘2015-08-20′;
第⼆步,从ODS刷新到DW:
INSERT overwrite TABLE t_dw_orders_his
SELECT
orderid,createtime,modifiedtime,status,createtime AS dw_start_date,‘9999-12-31′ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-20′;
完成后,DW订单历史表中数据如下:
spark-sql> lect * from t_dw_orders_his;
1 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 创建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 ⽀付 2015-08-19 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 9999-12-31
5 2015-08-19 2015-08-20 ⽀付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 创建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 ⽀付 2015-08-20 9999-12-31
Time taken: 2.296 conds, Fetched 7 row(s)
但是这样也有⼀个问题,就是数据量⼤的时候,其实每个分区都存储了许多重复的数据,⾮常的浪费存储空间。
于是乎,拉链表就出来了。在介绍拉链表之前,我们先介绍⼀下增量表。
⽅案三:增增量抽取,增量数据放到增量分区
增量表,就是记录每天新增数据的表,⽐如说,从24号到25号新增了那些数据,改变了哪些数据,这些都会存储在增量表的25号分区⾥⾯。上⾯说的快照表的25号分区和24号分区(都是t+1,实际时间分别对应26号和25号),它两的数据相减就是实际时间25号到26号有变化的、增加的数据,也就相当于增量表⾥⾯25号分区的数据。
每天从源系统订单表中,将前⼀天的增量数据抽取到ODS层的增量数据表。这⾥的增量需要通过订单表中的创建时间和修改时间来确定:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘${day}‘)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime = ‘${day}’ OR modifiedtime = ‘${day}';
适应数仓的事务事实表,或者ODS层
注意:在ODS层按天分区的增量表,最好保留⼀段时间的数据,⽐如半年,为了防⽌某⼀天的数据有问题⽽回滚重做数据。
⽅案四:增量更新
⽅式⼀:增量更新、增量数据放到临时表,然后再刷回去⽬标表
我们可以创建两个hive表,⼀个为test_temp stored as textfile(hive中的临时表),⼀个为test stored as orcfile(hive中最终需要的表,试⽤orcfile格式,因为它的性能⾼)。
1、 创建表
CREATE TABLE customer (
id int,
age tinyint,
name string
肾精亏虚怎么调理)
partitioned by(dt string)
row format delimited
fields terminated by '|'
stored as textfile;
##导⼊初始化数据:
load data local inpath '/home/hadoop/'
into table customer partition(dt = '201506');
hive> lect * from customer order by id;
customer.id customer.age customer.name customer.dt
成龙有多少钱1 25 jiangshouzhuang 201506
2 2
3 zhangyun 201506
3 2
4 yiyi 201506
4 32 mengmeng 201506
2、数据每天都会发⽣变化,我们使⽤临时数据表customer_temp来记录每天客户信息,字段和属性与customer表⼀致,
create table customer_temp like customer;
load data local inpath '/home/hadoop/hivetestdata/' into table customer_temp partition(dt = '201506');
包含的数据⽰例如下所⽰:
hive> lect * from customer_temp;
customer_temp.id customer_temp.age customer_temp.name customer_temp.dt
1 26 jiangshouzhuang 201506
5 45 xiaosan 201506
-- 写⼊临时表 (相当于从源数据库增量导⼊的数据 1 被修改过,5 新增 )
3、合并,把⾮更新的数据和⾮增量的数据overwrite到test表中
如果需要实现客户表的增量更新,我们需要将两个表进⾏full outer join,将customer_temp表中发⽣修改的数据更新到customer表中。
--INSERT OVERWRITE INTO customer
排名前十的婴儿奶粉SELECT *
FROM customer_temp
UNION ALL
狐妖小红娘第五季
SELECT a.*
清肝二十七味丸
FROM customer a
LEFT JOIN customer_temp b
-
-如果增量表或临时表有条件(SELECT * FROM customer_temp WHERE y='2016' AND m='10' AND d='09' ) b
ON a.id = b.id
WHERE b.id IS NULL;
_u1.id _u1.age _u1.name _u1.dt
2 2
3 zhangyun201506
3 2
4 yiyi201506
4 32 mengmeng201506
1 26 jiangshouzhuang201506
5 45 xiaosan201506
或者:
inrt overwrite table test
lect * from test a where 0=(lect count(1) from log b where a.id=b.id)
union all
lect * from test_temp
⽅式⼆:Reconciliation 将新旧数据综合起来,ROW_NUMBER() OVER PARTITION BY 取最新,刷回去⽬标表
在这段中,主要解决的问题是增量与初始化的融合。初始化的数据,存储在 ba_table 中, ⽽增量数据我们已经装载到了incremental_table 中。将两者的数据合⼆为⼀,就可以⽣成与源数据库⼀致的最新数据。前提是源数据库的任何数据⾏不接受硬删除即delete 操作,⽽是在⾏上打了⼀个软删除的标签,表⽰该⾏已删除。如果是做了硬删除,那么同时也要做好删除的审计,将删除的数据⾏放⼊审计表中,⼀同发送给 incremental_table .
ba_table
CREATE TABLE ba_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/ur/hive/ba_table';
incremental_table
CREATE EXTERNAL TABLE incremental_table (
id string,
field1 string,
field2 string,
field3 string,
field4 string,
field5 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/ur/hive/incremental_table';
reconcile_view
--两表合并去时间最新⼀条
CREATE VIEW reconcile_view
AS
SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) AS rn
FROM (
SELECT * FROM ba_table
UNION ALL
SELECT * FROM incremental_table
) t1
) t2
WHERE rn = 1;
从最后⼀个view定义来解说,incremental_table 必须拥有增量记录的全部,因此硬删除操作就不会反应在 incremental_table ⾥头。但是 reconcile_view 所涉及的量毕竟有限,浪费明明不会更改的那部分数据的计算。因此如果能做好分区,仅仅对某⼏个分区做全量更新会更⾼效。
或者直接:
SELECT b.*
FROM崛起近义词
(
SELECT a.*, row_number() over(partition by id ORDER BY lastmodificationtime desc) AS no
衔接拼音FROM
(SELECT * FROM ba_table芒果酱的做法
UNION all
SELECT * FROM incremental_table
) a
) b
= 1
⽅案五:拉链表
6、任务调度
1. Oozie 可以将这4步统⼀做成⼀个⼯作流,⽅便调度
2. 可以⽤脚本⾃定义⼯作流,就像数据仓库的 ETL ⼀样
负责调度的shell脚本load_tracklogs.sh
注:这⾥涉及到了两个点:1) for循环 2) linux下字符串的截取${line:0:4} 3) 传递参数到hive的sql脚本
#!/bin/sh
## 环境变量⽣效
. /etc/profile
## HIVE HOME
HIVE_HOME=/opt/cdh5.3.6/hive-0.13.1-cdh5.3.6
## ⽇志⽬录
LOG_DIR=/data/tracklogs
## ⽬录名称, 依据⽇期date获取
yesterday=`date -d -1days '+%Y%m%d'`
###
for line in `ls $LOG_DIR/${yesterday}`
do
echo "loading $line .............."
#从⽂件名称中解析出⽇期和⼩时
daily=${line:0:4}${line:4:2}${line:6:2}
hour=${line:8:2}
LOAD_FILE=${LOG_DIR}/${yesterday}/${line}
### echo $daily + $hour
### ${HIVE_HOME}/bin/hive -e "LOAD DATA LOCAL INPATH '${LOAD_FILE}' OVERWRITE INTO TABLE ack_log PARTITION(date = '${daily}', hour ${HIVE_HOME}/bin/hive --hiveconf LOAD_FILE_PARAM=${LOAD_FILE} --hiveconf daily_param=${daily} --hiveconf hour_param=${hour} -f /home/hadoop/load_ done
制定每天定时执⾏
可以在当前⽤户下直接创建:crontab -e
注:crontab中的五个 *号分别代表分,时,⽇,⽉,周,下⾯的例⼦就是每天晚上1点30运⾏任务的例⼦,注意sh命令前⼀般需要加上绝对
路径
# LODAD DATA INTO TRACK_LOG 30 1 * * * /bin/sh /home/hadoop/load_tracklogs.sh
⼆、缓慢变化维(拉链表)