FlinkCumulateWindow
Flink 累计窗⼝
之前⼀直⽐较遗憾,Flink Sql 没有 Trigger 功能,长时间的窗⼝不能在中途触发计算,输出中间结果。⽽很多实时指标是⼩时、天级的累集窗⼝,⽐如⼤屏中的当⽇ pv、uv,整
体是⼀天中所有访问的次数和⽤户数,但是需要实时更新,⽐如每 10S 更新⼀次截⽌到当前的pv、uv。
这种场景使⽤ Streaming Api 很容易实现,就是个天的翻滚窗⼝加上 10S 的 Trigger 就可以了
.windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.ds(10)))
Flink 1.13 版本以前,Sql 中还不能实现这个功能,1.13 添加了 CUMULATE 窗⼝,可以⽀持这种场景。
(感谢 antigeneral 同学提醒有这个功能)
以下为官⽹介绍:
累积窗⼝在某些情况下⾮常有⽤,例如在固定窗⼝间隔内提前触发的滚动窗⼝。例如,每⽇仪表板绘制从 00:00 到10:00 处每分钟累积的 UV, UV 表⽰从 00:00 到 10:00 的 UV 总数。这可以通过 CUMULATE 窗⼝轻松地实现。
CUMULATE 函数将元素分配给覆盖初始步长间隔内的窗⼝,并每⼀步扩展到⼀个步长(保持窗⼝开始固定),直到最⼤窗⼝⼤⼩。您可以将 CUMULATE 函数视为⾸先应⽤ TUMBLE 最⼤窗⼝⼤⼩的窗⼝,然后将每个滚动窗⼝拆分为具有相同窗⼝开例如,您可以有⼀个 1 ⼩时步长和 1 天最⼤⼤⼩的累积窗⼝,并且您将获得每天的窗⼝:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ..., [00:00, 24:00)。
这些CUMULATE函数根据列分配窗⼝。的返回值CUMULATE是⼀个新的关系,包括原始关系的所有列以及额外的 3 列名为“window_start”、“window_end”、“window_time”以指⽰分
配的窗⼝。原始时间属性“timecol”将是窗⼝ TVF 之后的常规时间戳列。
CUMULATE需要三个必需的参数。
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data: 是⼀个表参数,可以与时间属性列有任何关系。
timecol: 是⼀个列描述符,指⽰数据的哪些列应该映射到滚动窗⼝。breaking news
step: 是指定顺序累积窗⼝结束之间增加的窗⼝⼤⼩的持续时间。
size: 是指定累积窗⼝最⼤宽度的持续时间。size 必须是 step 的整数倍。
这是 Bid 表上的⽰例调⽤:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
-- window table-valued function should be ud with aggregate operation,
-- this example is just ud for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
| bidtime | price | item | window_start | window_end | window_time |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
lkb
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
澳大利亚留学的条件
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
be happy lee| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
圣诞节英语怎么写
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
uae
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the cumulating windowed table
> SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
nd nudes| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
实现案例:
-- flink cumulate window tvf calc pv&uv
create table if not exists datagen_source (
id int
,name string
,x string
thatisall,age int
,birthday string
,proc_time as proctime()
) with (
'connector' = 'datagen'
,'rows-per-cond' = '10000'
avic
,'fields.id.kind' = 'random'
,'fields.id.min' = '1'
,'fields.id.max' = '2000000'
);
create table if not exists print_sink(
start_time string
,end_time string
,pv bigint
,uv bigint
)
with (
'connector' = 'print'
);
inrt into print_sink
lect
date_format(window_start, 'HH:mm:ss')
, date_format(window_end, 'HH:mm:ss')
, count(id)
, count(distinct id)
utilityFROM TABLE(
CUMULATE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND, INTERVAL '1' DAY)) GROUP BY window_start, window_end
输出结果:
+I[00:00:00, 09:22:40, 8880000, 1976509]
+I[00:00:00, 09:22:50, 8980000, 1977652]
+I[00:00:00, 09:23:00, 9080000, 1978750]
+I[00:00:00, 09:23:10, 9180000, 1979766]
+I[00:00:00, 09:23:20, 9280000, 1980767]
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推⽂