FlinkSQL-⾃定义聚合函数AggregateFunction
白眼狼英语
FlinkSQL-⾃定义聚合函数AggregateFunction
什么是聚合函数
聚合,多对⼀,类似窗⼝聚合
⽤户⾃定义聚合函数(Ur-Defined Aggregate Functions,UDAF)可以把⼀个表中的数据,聚合成⼀个标量值new egg
⽤户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的
聚合函数的实现
AggregationFunction要求必须实现的⽅法
createAccumulator()
accumulate()
cloudsgetValue()
聚合函数的⼯作原理
⾸先,它需要⼀个累加器(Accumulator),⽤来保存聚合中间结果的数据结构;可以通过调⽤ createAccumulator()⽅法创建空累加器随后,对每个输⼊⾏调⽤函数的 accumulate() ⽅法来更新累加器
处理完所有⾏后,将调⽤函数的 getValue() ⽅法来计算并返回最终结果
代码实现
//实现⾃定义的AggregateFunction<;结果值,中间状态>
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double,Integer>>{
//输出结果
k歌情人@Override
public Double getValue(Tuple2<Double, Integer> acc){
昂立日语
return acc.f0/acc.f1;
}
//初始化累加器
@Override
public Tuple2<Double, Integer>createAccumulator(){
return new Tuple2<Double,Integer>(0.0,0);
pavo
}
//必须实现⼀个accumulate(中间状态,传⼊的⼀条数据)⽅法,来数据之后更新状态
public void accumulate(Tuple2<Double,Integer> acc, Double temp){翻译张蕾
acc.f0 += temp;
acc.f1 +=1;
}
}
测试⽤例
//流转换成表 Tuple3<String,Long,Double>
Table sourceTable = tableEnv.fromDataStream(dataStream,"f0 as id, f1 as ts, f2 as temp,pt.proctime");
//在环境中注册UDAF
AvgTemp avgTemp =new AvgTemp();
candysoft//tableAPI
Table resultTable = upBy("id")
.aggregate("avgTemp(temp) as avgtemp")
.lect("id,avgtemp");
blb>couchdb
//SQL
Table resultSqlTbale = tableEnv.sqlQuery("lect id,avgTemp(temp) as avgtemp from nsor group by id"); RetractStream(resultSqlTbale,Row.class).print();