FlinkSqlWith1.14查询-窗⼝函数(TVFs)(译)
Windows是处理⽆限流的核⼼。Windows将流分成有限⼤⼩的“桶”,我们可以在这些桶上应⽤计算。本⽂档重点介绍如何在Flink
SQL中执⾏窗⼝化,以及程序员如何从其提供的功能中获得最⼤收益。
ApacheFlink提供了⼏个窗⼝表值函数(TVF)来将表的元素划分为窗⼝,包括:
会话窗⼝(即将⽀持)
请注意,每个元素在逻辑上可以属于多个窗⼝,具体取决于您使⽤的窗⼝表值函数。例如,HOP窗⼝创建重叠窗⼝,其中单个元素可以分
配给多个窗⼝。
WindowingTVF是Flink定义的多态表函数(简称PTF)。PTF是SQL2016标准的⼀部分,是⼀种特殊的表函数,但可以将表作为参
数。PTF是改变表格形状的强⼤功能。因为PTF在语义上类似于表,所以它们的调⽤发⽣在语句的FROM⼦句中SELECT。
开窗TVF是对传统的的替代。窗⼝化TVF更符合SQL标准并且更强⼤,可以⽀持复杂的基于窗⼝的计算,例如WindowTopN、
WindowJoin。但是,只能⽀持窗⼝聚合。
了解更多如何应⽤基于窗⼝TVF的进⼀步计算:
窗⼝函数
ApacheFlink提供了3个内置的窗⼝TVFTUMBLE:HOP和CUMULATE.窗⼝化TVF的返回值是⼀个新的关系,包括原始关系的所有
列以及额外的3列,名为“window_start”、“window_end”、“window_time”,以指⽰分配的窗⼝。“window_time”字段是开窗
TVF后窗⼝的,可⽤于后续的基于时间的操作,例如另⼀个开窗TVF,或,。的值window_time总是等于window_end-1ms。
翻滚
该TUMBLE函数将每个元素分配给指定窗⼝⼤⼩的窗⼝。翻滚窗⼝具有固定⼤⼩并且不重叠。例如,假设您指定⼀个⼤⼩为5分钟的滚动
窗⼝。在这种情况下,Flink将评估当前窗⼝,并每五分钟启动⼀个新窗⼝,如下图所⽰。
该函数根据TUMBLE列为关系的每⼀⾏分配⼀个窗⼝。的返回值是⼀个新的关系,包括原始关系的所有列以及额外的3列,名
为“window_start”、“window_end”、“window_time”,以指⽰分配的窗⼝。原始时间属性“timecol”将是窗⼝TVF之后的常规
时间戳列。TUMBLE
TUMBLE函数接受三个必需参数,⼀个可选参数:
data:是⼀个表参数,可以是与时间属性列的任何关系。
timecol:是⼀个列描述符,指⽰数据的哪些列应映射到翻转窗⼝。
size:是指定滚动窗⼝宽度的持续时间。
offt:是⼀个可选参数,⽤于指定窗⼝起始位置的偏移量。
这是对表的调⽤⽰例Bid:
Flink
Flink
Flink
TUMBLE(
Flink
TUMBLE(
TIMECOL
Flink
TUMBLE(
注意:为了更好地理解窗⼝化的⾏为,我们简化了时间戳值的显⽰以不显⽰尾随零,例如如果类型为,2020-04-1508:05则应显⽰为2020-
04-1508:05:00.000在FlinkSQL客户端中TIMESTAMP(3)。
跳
该HOP函数将元素分配给固定长度的窗⼝。与TUMBLE窗⼝函数⼀样,窗⼝的⼤⼩由窗⼝⼤⼩参数配置。⼀个附加的窗⼝滑动参数控制⼀
个跳跃窗⼝的启动频率。因此,如果幻灯⽚⼩于窗⼝⼤⼩,则跳跃窗⼝可能会重叠。在这种情况下,元素被分配给多个窗⼝。跳窗也称
为“滑动窗”。
例如,您可以有⼤⼩为10分钟的窗⼝滑动5分钟。这样,您将每5分钟获得⼀个包含过去10分钟内到达的事件的窗⼝,如下图所⽰。
该HOP函数分配覆盖⼤⼩间隔内的⾏的窗⼝,并根据列移动每张幻灯⽚。的返回值HOP是⼀个新的关系,包括原始关系的所有列以及额外
的3列,名为“window_start”、“window_end”、“window_time”,以指⽰分配的窗⼝。原始时间属性“timecol”将是窗⼝TVF
后的常规时间戳列。
该HOP接受四个必需参数,⼀个可选参数:
HOP(TABLEdata,DESCRIPTOR(timecol),slide,size[,offt])
data:是⼀个表参数,可以是与时间属性列的任何关系。
timecol:是⼀个列描述符,指⽰数据的哪些列应映射到跳跃窗⼝。
slide:是⼀个持续时间,指定顺序跳跃窗⼝开始之间的持续时间
size:是指定跳跃窗⼝宽度的持续时间。
offt:是⼀个可选参数,⽤于指定窗⼝起始位置的偏移量。
这是对表的调⽤⽰例Bid:
--NOTE:CurrentlyFlinkdoesn'tsupportevaluatingindividualwindowtable-valuedfunction,
--windowtable-valuedfunctionshouldbeudwithaggregateoperation,
--thixampleisjustudforexplainingthesyntaxandthedataproducedbytable-valuedfunction.
>SELECT*FROMTABLE(
HOP(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'5'MINUTES,INTERVAL'10'MINUTES));
--orwiththenamedparams
--note:theDATAparammustbethefirst
>SELECT*FROMTABLE(
HOP(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SLIDE=>INTERVAL'5'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|bidtime|price|item|window_start|window_end|window_time|
+------------------+-------+------+------------------+------------------+-------------------------+
|2020-04-1508:05|4.00|C|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:05|4.00|C|2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999|
|2020-04-1508:07|2.00|A|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:07|2.00|A|2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999|
|2020-04-1508:09|5.00|D|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:09|5.00|D|2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:05|2020-04-1508:15|2020-04-1508:14:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
|2020-04-1508:17|6.00|F|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
|2020-04-1508:17|6.00|F|2020-04-1508:15|2020-04-1508:25|2020-04-1508:24:59.999|
+------------------+-------+------+------------------+------------------+-------------------------+
--applyaggregationonthehoppingwindowedtable
>SELECTwindow_start,window_end,SUM(price)
FROMTABLE(
HOP(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'5'MINUTES,INTERVAL'10'MINUTES))
GROUPBYwindow_start,window_end;
+------------------+------------------+-------+
|window_start|window_end|price|
+------------------+------------------+-------+
|2020-04-1508:00|2020-04-1508:10|11.00|
|2020-04-1508:05|2020-04-1508:15|15.00|
|2020-04-1508:10|2020-04-1508:20|10.00|
|2020-04-1508:15|2020-04-1508:25|6.00|
+------------------+------------------+-------+
累积
累积窗⼝在某些场景中⾮常有⽤,例如在固定窗⼝间隔内提前触发的翻滚窗⼝。例如,每⽇仪表盘从00:00到每分钟绘制累积
UV,10:00的UV表⽰从00:00到10:00的UV总数。这可以通过CUMULATE窗⼝轻松有效地实现。
该CUMULATE函数将元素分配给在初始步长间隔内覆盖⾏的窗⼝,并在每⼀步扩展⼀个步长(保持窗⼝开始固定),直到最⼤窗⼝⼤⼩。
您可以将CUMULATE功能视为TUMBLE⾸先应⽤最⼤窗⼝⼤⼩的窗⼝,并将每个翻转窗⼝拆分为具有相同窗⼝开始和窗⼝结束步长差异的
⼏个窗⼝。所以累积窗⼝确实重叠并且没有固定的⼤⼩。
例如,您可以有⼀个1⼩时步长和1天最⼤尺⼨的累积窗⼝,您将获得窗⼝:[00:00,01:00),[00:00,02:00),[00:00,03:00),...,[00:00,
24:00)每⼀天。
这些CUMULATE函数根据列分配窗⼝。的返回值CUMULATE是⼀个新的关系,包括原始关系的所有列以及额外的3列,名
为“window_start”、“window_end”、“window_time”,以指⽰分配的窗⼝。原始时间属性“timecol”将是窗⼝TVF之后的常规
时间戳列。
CUMULATE接受四个必需参数,⼀个可选参数:
CUMULATE(TABLEdata,DESCRIPTOR(timecol),step,size)
data:是⼀个表参数,可以是与时间属性列的任何关系。
timecol:是⼀个列描述符,指⽰数据的哪些列应映射到翻转窗⼝。
step:是指定连续累积窗⼝结束之间增加的窗⼝⼤⼩的持续时间。
size:是指定累积窗⼝的最⼤宽度的持续时间。size必须是的整数倍step。
offt:是⼀个可选参数,⽤于指定窗⼝起始位置的偏移量。
以下是对Bid表的调⽤⽰例:
--NOTE:CurrentlyFlinkdoesn'tsupportevaluatingindividualwindowtable-valuedfunction,
--windowtable-valuedfunctionshouldbeudwithaggregateoperation,
--thixampleisjustudforexplainingthesyntaxandthedataproducedbytable-valuedfunction.
>SELECT*FROMTABLE(
CUMULATE(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'2'MINUTES,INTERVAL'10'MINUTES));
--orwiththenamedparams
--note:theDATAparammustbethefirst
>SELECT*FROMTABLE(
CUMULATE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
STEP=>INTERVAL'2'MINUTES,
SIZE=>INTERVAL'10'MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|bidtime|price|item|window_start|window_end|window_time|
+------------------+-------+------+------------------+------------------+-------------------------+
|2020-04-1508:05|4.00|C|2020-04-1508:00|2020-04-1508:06|2020-04-1508:05:59.999|
|2020-04-1508:05|4.00|C|2020-04-1508:00|2020-04-1508:08|2020-04-1508:07:59.999|
|2020-04-1508:05|4.00|C|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:07|2.00|A|2020-04-1508:00|2020-04-1508:08|2020-04-1508:07:59.999|
|2020-04-1508:07|2.00|A|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:09|5.00|D|2020-04-1508:00|2020-04-1508:10|2020-04-1508:09:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:12|2020-04-1508:11:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:14|2020-04-1508:13:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:16|2020-04-1508:15:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:10|2020-04-1508:14|2020-04-1508:13:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:10|2020-04-1508:16|2020-04-1508:15:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
|2020-04-1508:17|6.00|F|2020-04-1508:10|2020-04-1508:18|2020-04-1508:17:59.999|
|2020-04-1508:17|6.00|F|2020-04-1508:10|2020-04-1508:20|2020-04-1508:19:59.999|
+------------------+-------+------+------------------+------------------+-------------------------+
--applyaggregationonthecumulatingwindowedtable
>SELECTwindow_start,window_end,SUM(price)
FROMTABLE(
CUMULATE(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'2'MINUTES,INTERVAL'10'MINUTES))
GROUPBYwindow_start,window_end;
+------------------+------------------+-------+
|window_start|window_end|price|
+------------------+------------------+-------+
|2020-04-1508:00|2020-04-1508:06|4.00|
|2020-04-1508:00|2020-04-1508:08|6.00|
|2020-04-1508:00|2020-04-1508:10|11.00|
|2020-04-1508:10|2020-04-1508:12|3.00|
|2020-04-1508:10|2020-04-1508:14|4.00|
|2020-04-1508:10|2020-04-1508:16|4.00|
|2020-04-1508:10|2020-04-1508:18|10.00|
|2020-04-1508:10|2020-04-1508:20|10.00|
+------------------+------------------+-------+
窗⼝偏移量
Offt是⼀个可选参数,可⽤于更改窗⼝分配。它可以是正持续时间和负持续时间。窗⼝偏移的默认值为0。如果设置不同的偏移值,相同
的记录可能分配给不同的窗⼝。
例如,对于2021-06-3000:00:04⼤⼩为10分钟的Tumble窗⼝的时间戳记录,将分配给哪个窗⼝?
如果offt值为-16MINUTE,则记录分配给窗⼝[2021-06-2923:54:00,2021-06-3000:04:00)。
如果offt值为-6MINUTE,则记录分配给窗⼝[2021-06-2923:54:00,2021-06-3000:04:00)。
如果offt是-4MINUTE,则记录分配给窗⼝[2021-06-2923:56:00,2021-06-3000:06:00)。
如果offt是0,则记录分配给窗⼝[2021-06-3000:00:00,2021-06-3000:10:00)。
如果offt是4MINUTE,则记录分配给窗⼝[2021-06-2923:54:00,2021-06-3000:04:00)。
如果offt是6MINUTE,则记录分配给窗⼝[2021-06-2923:56:00,2021-06-3000:06:00)。
如果offt是16MINUTE,则记录分配给窗⼝[2021-06-2923:56:00,2021-06-3000:06:00)。我们可以发现,⼀些窗⼝偏移参数可能对
窗⼝的分配有相同的影响。在上述情况下-16MINUTE,-6MINUTE和4MINUTE对⼤⼩为10分钟的Tumble窗⼝具有相同的效果。
注意:窗⼝偏移的影响只是为了更新窗⼝分配,它对⽔印没有影响。
我们通过⼀个⽰例来描述如何在以下SQL中使⽤Tumble窗⼝中的偏移量。
--NOTE:CurrentlyFlinkdoesn'tsupportevaluatingindividualwindowtable-valuedfunction,
--windowtable-valuedfunctionshouldbeudwithaggregateoperation,
--thixampleisjustudforexplainingthesyntaxandthedataproducedbytable-valuedfunction.
FlinkSQL>SELECT*FROMTABLE(
TUMBLE(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'10'MINUTES,INTERVAL'1'MINUTES));
--orwiththenamedparams
--note:theDATAparammustbethefirst
FlinkSQL>SELECT*FROMTABLE(
TUMBLE(
DATA=>TABLEBid,
TIMECOL=>DESCRIPTOR(bidtime),
SIZE=>INTERVAL'10'MINUTES,
OFFSET=>INTERVAL'1'MINUTES));
+------------------+-------+------+------------------+------------------+-------------------------+
|bidtime|price|item|window_start|window_end|window_time|
+------------------+-------+------+------------------+------------------+-------------------------+
|2020-04-1508:05|4.00|C|2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999|
|2020-04-1508:07|2.00|A|2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999|
|2020-04-1508:09|5.00|D|2020-04-1508:01|2020-04-1508:11|2020-04-1508:10:59.999|
|2020-04-1508:11|3.00|B|2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999|
|2020-04-1508:13|1.00|E|2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999|
|2020-04-1508:17|6.00|F|2020-04-1508:11|2020-04-1508:21|2020-04-1508:20:59.999|
+------------------+-------+------+------------------+------------------+-------------------------+
--applyaggregationonthetumblingwindowedtable
FlinkSQL>SELECTwindow_start,window_end,SUM(price)
FROMTABLE(
TUMBLE(TABLEBid,DESCRIPTOR(bidtime),INTERVAL'10'MINUTES,INTERVAL'1'MINUTES))
GROUPBYwindow_start,window_end;
+------------------+------------------+-------+
|window_start|window_end|price|
+------------------+------------------+-------+
|2020-04-1508:01|2020-04-1508:11|11.00|
|2020-04-1508:11|2020-04-1508:21|10.00|
+------------------+------------------+-------+
注意:为了更好地理解窗⼝化的⾏为,我们简化了时间戳值的显⽰以不显⽰尾随零,例如如果类型为,2020-04-1508:05则应显⽰为2020-
04-1508:05:00.000在FlinkSQL客户端中TIMESTAMP(3)。
FromFlinkWebsiteUrl:
-------------------------------------------------------------------禁⽌转载--------------------------------------------------
待修改。。。。
本文发布于:2023-02-01 00:43:46,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/88/170393.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |