FlinkCumulateWindow

更新时间:2023-06-11 22:32:26 阅读: 评论:0

FlinkCumulateWindow
Flink 累计窗⼝
之前⼀直⽐较遗憾,Flink Sql 没有 Trigger 功能,长时间的窗⼝不能在中途触发计算,输出中间结果。⽽很多实时指标是⼩时、天级的累集窗⼝,⽐如⼤屏中的当⽇ pv、uv,整
体是⼀天中所有访问的次数和⽤户数,但是需要实时更新,⽐如每 10S 更新⼀次截⽌到当前的pv、uv。
这种场景使⽤ Streaming Api 很容易实现,就是个天的翻滚窗⼝加上 10S 的 Trigger 就可以了
.windowAll(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.ds(10)))
Flink 1.13 版本以前,Sql 中还不能实现这个功能,1.13 添加了 CUMULATE 窗⼝,可以⽀持这种场景。
(感谢 antigeneral 同学提醒有这个功能)
以下为官⽹介绍:
累积窗⼝在某些情况下⾮常有⽤,例如在固定窗⼝间隔内提前触发的滚动窗⼝。例如,每⽇仪表板绘制从 00:00 到10:00 处每分钟累积的 UV, UV 表⽰从 00:00 到 10:00 的 UV 总数。这可以通过 CUMULATE 窗⼝轻松地实现。
CUMULATE 函数将元素分配给覆盖初始步长间隔内的窗⼝,并每⼀步扩展到⼀个步长(保持窗⼝开始固定),直到最⼤窗⼝⼤⼩。您可以将 CUMULATE 函数视为⾸先应⽤ TUMBLE 最⼤窗⼝⼤⼩的窗⼝,然后将每个滚动窗⼝拆分为具有相同窗⼝开例如,您可以有⼀个 1 ⼩时步长和 1 天最⼤⼤⼩的累积窗⼝,并且您将获得每天的窗⼝:[00:00, 01:00), [00:00, 02:00), [00:00, 03:00), ..., [00:00, 24:00)。
这些CUMULATE函数根据列分配窗⼝。的返回值CUMULATE是⼀个新的关系,包括原始关系的所有列以及额外的 3 列名为“window_start”、“window_end”、“window_time”以指⽰分
配的窗⼝。原始时间属性“timecol”将是窗⼝ TVF 之后的常规时间戳列。
CUMULATE需要三个必需的参数。
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
data: 是⼀个表参数,可以与时间属性列有任何关系。
timecol: 是⼀个列描述符,指⽰数据的哪些列应该映射到滚动窗⼝。breaking news
step: 是指定顺序累积窗⼝结束之间增加的窗⼝⼤⼩的持续时间。
size: 是指定累积窗⼝最⼤宽度的持续时间。size 必须是 step 的整数倍。
这是 Bid 表上的⽰例调⽤:
-- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
--  window table-valued function should be ud with aggregate operation,
--  this example is just ud for explaining the syntax and the data produced by table-valued function.
> SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
-- or with the named params
-- note: the DATA param must be the first
> SELECT * FROM TABLE(
CUMULATE(
DATA => TABLE Bid,
TIMECOL => DESCRIPTOR(bidtime),
STEP => INTERVAL '2' MINUTES,
SIZE => INTERVAL '10' MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|          bidtime | price | item |    window_start |      window_end |            window_time  |
+------------------+-------+------+------------------+------------------+-------------------------+
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
lkb
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 |  5.00 | D    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
澳大利亚留学的条件
| 2020-04-15 08:11 |  3.00 | B    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
be happy lee| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
圣诞节英语怎么写
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
uae
| 2020-04-15 08:13 |  1.00 | E    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 |  6.00 | F    | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+------------------+-------+------+------------------+------------------+-------------------------+
-- apply aggregation on the cumulating windowed table
> SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
|    window_start |      window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 |  4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 |  6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
nd nudes| 2020-04-15 08:10 | 2020-04-15 08:12 |  3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 |  4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
实现案例:
-- flink cumulate window tvf calc pv&uv
create table if not exists datagen_source (
id        int
,name    string
,x      string
thatisall,age      int
,birthday string
,proc_time as proctime()
) with (
'connector' = 'datagen'
,'rows-per-cond' = '10000'
avic
,'fields.id.kind' = 'random'
,'fields.id.min' = '1'
,'fields.id.max' = '2000000'
);
create table if not exists print_sink(
start_time string
,end_time string
,pv  bigint
,uv  bigint
)
with (
'connector' = 'print'
);
inrt into print_sink
lect
date_format(window_start, 'HH:mm:ss')
, date_format(window_end, 'HH:mm:ss')
, count(id)
, count(distinct id)
utilityFROM TABLE(
CUMULATE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND, INTERVAL '1' DAY))  GROUP BY window_start, window_end
输出结果:
+I[00:00:00, 09:22:40, 8880000, 1976509]
+I[00:00:00, 09:22:50, 8980000, 1977652]
+I[00:00:00, 09:23:00, 9080000, 1978750]
+I[00:00:00, 09:23:10, 9180000, 1979766]
+I[00:00:00, 09:23:20, 9280000, 1980767]
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推⽂

本文发布于:2023-06-11 22:32:26,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/141787.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:功能   滚动   间隔   步长
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图