FlinkSQL中的窗⼝函数
概述
Flink窗⼝函数是flink的重要特性,⽽Flink SQL API是Flink批流⼀体的封装,学习明⽩本节课,是对Flink学习的很⼤收益!
窗⼝函数
窗⼝函数Flink SQL⽀持基于⽆限⼤窗⼝的聚合(⽆需在SQL Query中,显式定义任何窗⼝)以及对⼀个特定的窗⼝的聚合。例如,需要统计在过去的1分钟内有多少⽤户点击了某个的⽹页,可以通过定义⼀个窗⼝来收集最近1分钟内的数据,并对这个窗⼝内的数据进⾏计算。
Flink SQL⽀持的窗⼝聚合主要是两种:Window聚合和Over聚合。本⽂档主要为您介绍Window聚合。Window聚合⽀持Event Time和Processing Time两种时间属性定义窗⼝。每种时间属性类型⽀持三种窗⼝类型:滚动窗⼝(TUMBLE)、滑动窗⼝(HOP)和会话窗⼝(SESSION)。
时间属性
Flink SQL⽀持以下两种时间属性。实时计算可以基于这两种时间属性对数据进⾏窗⼝聚合。
:您提供的事件时间(通常是数据的最原始的创建时间),Event Time⼀定是您提供在Schema⾥的数据。
:对事件进⾏处理的本地系统时间。
概念性的东西不说了,参考:
2 分组窗⼝
2.1 分组窗⼝的类型
SQL查询的分组窗⼝是通过GROUP BY⼦句定义的。类似于使⽤常规GROUP BY语句的查询,窗⼝分组语句的GROUP BY⼦句中带有⼀个窗⼝函数为每个分组计算出⼀个结果。以下是批处理和流出理表⽀持的分组窗⼝函数:
(1)TUMBLE(time_attr, interval)
定义⼀个滚动窗⼝。滚动窗⼝把⾏分配到有固定持续时间(interval)的不重叠的连续窗⼝。⽐如,5分钟的滚动窗⼝以5分钟为间隔对⾏进⾏分组。滚动窗⼝可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
(2)HOP(time_attr, interval, interval)
定义⼀个跳跃窗⼝(在Table API中成为滑动窗⼝)。滑动窗⼝有⼀个固定的持续时间(第⼆个interval参数)以及⼀个滑动的间隔(第⼀个interval参数)。若滑动间隔⼩于窗⼝的持续时间,滑动窗⼝则会出现重叠;因此,⾏将会被分配到多个窗⼝中。⽐如,⼀个⼤⼩为15分钟的滑动窗⼝,其滑动间隔为5分钟,将会把每⼀⾏数据分配到3个15分钟的窗⼝中。滑动窗⼝可以定义咋事件时间(批处理、流处理)或处理时间(流处理)上。
(3)SESSION(time_attr, interval)
定义⼀个会话时间窗⼝。会话时间窗⼝没有⼀个固定的持续时间,但是它们的边界会根据interval所定义的不活跃时间所确定;即⼀个会话时间窗⼝在定义的间隔时间内没有新纪录出现,该窗⼝会被关闭。例如时间窗⼝的间隔时间是30分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动⼀个新的会话时间窗⼝(否则该⾏数据会被添加到当前的窗⼝),且若在30分钟内没有观测到新纪录,这个窗⼝将会被关闭。会话时间窗⼝可以使⽤事件时间(批处理、流处理)或处理时间(流处理)。
2.2 时间属性
在流处理表中的SQL查询中,分组窗⼝函数的time_attr参数必须引⽤⼀个合法的时间属性,且该属性需要指定⾏的处理时间或事件时间。
对于批处理的SQL查询,分组窗⼝函数的time_attr参数必须是⼀个TIMESTAMP类型的属性。
2.3 选择分组窗⼝的开始和结束时间戳(辅助函数)
(1)返回相对应的滚动、滑动和会话窗⼝的开始时间(包含边界)
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)
(2)返回相对应的滚动、滑动和会话窗⼝的结束时间(包含边界)
TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)
注意:返回的间戳不可以在随后基于事件的操作中,作为⾏时间属性使⽤,⽐如基于事件窗⼝的join以及分组窗⼝或分组窗⼝上的聚
合。
(3)返回相对应的滚动、滑动和会话窗⼝的结束时间(不包含边界)
TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)
返回的是⼀个可⽤于后续需要基于时间的操作的时间属性(rowtime attribute),⽐如基于时间窗⼝的join以及分组窗⼝或分组窗⼝上
的聚合。
(4)返回相对应的滚动、滑动和会话窗⼝的结束时间(不包含边界)
TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)
返回处理时间参数可⽤于后续需要基于事件的操作,⽐如基于时间窗⼝的join以及分组窗⼝或分组窗⼝上的聚合。
注意:辅助函数必须使⽤与GROUP BY⼦句中的分组窗⼝函数完全相同的参数来调⽤。
2.4 实战-使⽤Flink读取kafka数据,以时间时间使⽤活动窗⼝统计top5的商品。
package com.atguigu.hotitems_analysis
import java.sql.Timestamp
孕妇可以吃苦瓜
import java.util.Properties
寒食节和清明节的关系import org.apache.rialization.SimpleStringSchema
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.tors.kafka.FlinkKafkaConsumer
import org.apache.flink.table.api.{EnvironmentSettings, Slide}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.dataformat.BaRow
object HotItemsWithSQL {
def main(args: Array[String]): Unit = {
// 创建⼀个流处理执⾏环境
val env = ExecutionEnvironment
env.tParallelism(1)
env.tStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//val inputStream: DataStream[String] = adTextFile("D:\\\\AAA\\\\Flink\\\\UrBehaviorAnalysis\\
HotItemsAnalysis\\src\\main\\resources\\UrBehavior.csv" // 将数据转换成样例类类型,并且提取timestamp定义watermark
val properties = new Properties()
properties.tProperty("bootstrap.rvers", "")
properties.tProperty("t", "")
properties.tProperty("group.id", "kafka2es")
val inputStream: DataStream[String] = env.addSource( new FlinkKafkaConsumer[String]("student-test", new SimpleStringSchema(), properties) )
// 这个是单词个数统计
// val stream2 = inputStream.map(_.split("\\W+")).flatMap(_.toSeq).map((_, 1)).keyBy(0).sum(1)
// stream2.addSink(tup=>{ //sink动作,出发action
// stream2.addSink(tup=>{ //sink动作,出发action
少儿绘画// println(tup._1+", count-> ",tup._2)
小型微利企业认定标准// })
val dataStream: DataStream[UrBehavior] = inputStream
.map( data => {
val dataArray = data.split(",")
UrBehavior( dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong )
} )
.assignAscendingTimestamps(_.timestamp * 1000L)
补肾壮阳的最佳方法
// 要调⽤Table API,先创建表执⾏环境
val ttings = wInstance()
.uBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = ate(env, ttings)
// 将DataStream注册成表,提取需要的字段,进⾏处理,这个表和类严格对应
val table02 = tableEnv.sqlQuery( "lect count(behavior) cu from data_table_1" )
// //table转换成流才能显⽰
// ⽤SQL实现
val resultTable = tableEnv.sqlQuery(
"""
|lect *
|from (
| lect *,
| row_number() over (partition by windowEnd order by cnt desc) as row_num
| from (
| lect itemId,
| count(itemId) as cnt,
| hop_end(ts, interval '60' minute, interval '24' hour) as windowEnd
| from data_table
| where behavior = 'pv'
肖申克的救赎经典台词
| group by hop(ts, interval '60' minute, interval '24' hour), itemId
| )
|)
|where row_num <= 5
""".stripMargin)
//使⽤缩进模式将动态表转化为数据流,它⽤true或fal来标记数据的插⼊和撤回,返回true代表数据插⼊,fal代表数据的撤回 // 这个滑动步长是在拿到额
cpa和cfa// RetractStream[(BaRow)].print()
/*
输出
(fal,(-|42000))
(true,(+|42001))
(fal,(-|42001))
(true,(+|42002))
(fal,(-|42002))
*/
/*
输出
results> (true,(2453685,4,2017-11-26 07:00:00.0,4))
results> (fal,(1871901,4,2017-11-26 07:00:00.0,4))
*/
/*
1. 将env的streamgraph转化为jobgraph
1. 将env的streamgraph转化为jobgraph
2. 设置任务运⾏的配置信息configuration
3. 启动LocalFlinkMiniCluster
4. 提交jobgraph到Cluster"hot item with sql")
*/
}
}
这⾥使⽤到的开窗函数是:
hop_end(ts, interval '60' minute, interval '24' hour) as windowEnd
滑动窗⼝(HOP),也被称作Sliding Window。不同于滚动窗⼝,滑动窗⼝的窗⼝可以重叠。
滑动窗⼝有两个参数:slide和size。slide为每次滑动的步长,size为窗⼝的⼤⼩。
slide < size,则窗⼝会重叠,每个元素会被分配到多个窗⼝。
slide = size,则等同于滚动窗⼝(TUMBLE)。
slide > size,则为跳跃窗⼝,窗⼝之间不重叠且有间隙。
通常,⼤部分元素符合多个窗⼝情景,窗⼝是重叠的。因此,滑动窗⼝在计算移动平均数(moving averages)时很实⽤。例如,计算过去5分钟数据的平均值,每10秒钟更新⼀次,可以设置slide为10秒,size为5分钟。下图为您展⽰间隔为30秒,窗⼝⼤⼩为1分钟的滑动窗⼝。
table转数据流打印
1.Flink处理数据把表转换为流的时候,可以使⽤toAppendStream与toRetractStream,前者适⽤于数据追加的场景, 后者适⽤于更新,删除场景
2.FlinkSQL中可以使⽤我们⾃定义的函数.Flink UDF⾃定义函数实现:evaluation⽅法必须定义为public,
命名为eval。evaluation⽅法的输⼊参数类型和返回值类型决定着函数的输⼊参数类型和返回值类型。evaluation⽅法也可以被重载实现多个eval。同时evaluation⽅法⽀持变参数,例如: strs)
下边是个例⼦
今天在Flink 1.7.2版本上跑⼀个Flink SQL ⽰例 RetractPvUvSQL,报
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 19 to line 1, column 51: Cannot apply 'DATE_FORMAT' to arguments of type 'DATE_FORMAT(<VARCHAR(65536)>, <CHAR(2)>)'. Supported form(s): '(TIMESTAMP, FORMAT)'
从提⽰看应该是不⽀持参数为字符串,接下来我们⾃定义⼀个UDF函数来⽀持这种场景。
RetractPvUvSQL 代码
public class RetractPvUvSQL {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = ExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment(env);
DataStreamSource<PageVisit> input = env.fromElements(
new PageVisit("2017-09-16 09:00:00", 1001, "/page1"),
new PageVisit("2017-09-16 09:00:00", 1001, "/page2"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
new PageVisit("2017-09-16 10:30:00", 1005, "/page1"),
制义new PageVisit("2017-09-16 10:30:00", 1005, "/page2"));
// register the DataStream as table "visit_table"
Table table = tEnv.sqlQuery(
"SELECT " +
"visitTime, " +
"DATE_FORMAT(max(visitTime), 'HH') as ts, " +
"count(urId) as pv, " +
"count(distinct urId) as uv " +
"FROM visit_table " +
"GROUP BY visitTime");
DataStream<Tuple2<Boolean, Row>> dataStream = RetractStream(table, Row.class);
if (params.has("output")) {
String outPath = ("output");
System.out.println("Output path: " + outPath);
dataStream.writeAsCsv(outPath);
} el {
System.out.println("Printing result to stdout. U --output to specify output path.");
dataStream.print();
}
}
/**
* Simple POJO containing a website page visitor.
*/
public static class PageVisit {
public String visitTime;
public long urId;
public String visitPage;
// public constructor to make it a Flink POJO
public PageVisit() {
}
public PageVisit(String visitTime, long urId, String visitPage) {
this.visitTime = visitTime;
this.urId = urId;
this.visitPage = visitPage;
}
@Override
public String toString() {
return "PageVisit " + visitTime + " " + urId + " " + visitPage;
}
}
}