FlinkSQL实战(5):使⽤⾃定义函数实现关键字过滤统计
Flink SQL 实战 (5):使⽤⾃定义函数实现关键字过滤统计
在上⼀篇实战博客中使⽤POJO Schema解析来⾃ Kafka 的 JSON 数据源并且使⽤⾃定义函数处理。
现在我们使⽤更强⼤⾃定义函数处理数据
使⽤⾃定义函数实现关键字过滤统计
⾃定义表函数(UDTF)
与⾃定义的标量函数相似,⾃定义表函数将零,⼀个或多个标量值作为输⼊参数。 但是,与标量函数相⽐,它可以返回任意数量的⾏作为输出,⽽不是单个值。
为了定义表函数,必须扩展基类TableFunction并实现评估⽅法。 表函数的⾏为由其评估⽅法确定。 必须将评估⽅法声明为公开并命名为eval。 通过实现多个名为eval的⽅法,可以重载TableFunction。 评估⽅法的参数类型确定表函数的所有有效参数。 返回表的类型由TableFunction的通⽤类型确定。 评估⽅法使⽤ collect(T)⽅法发出输出⾏。
定义⼀个过滤字符串 记下关键字 的⾃定义表函数
KyeWordCount.java:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableFunction;
public class KyeWordCount extends TableFunction<Tuple2<String,Integer>> {
private String[] keys;
public KyeWordCount(String[] keys){
this.keys=keys;
}
public void eval(String in){
for (String key:keys){
if (in.contains(key)){
collect(new Tuple2<String, Integer>(key,1));
}
}
}
西洋骚妇}
实现关键字过滤统计:
public class UdtfJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = ExecutionEnvironment();
EnvironmentSettings streamSettings = wInstance().uBlinkPlanner().inStreamingMode().build();
离歌原唱
StreamTableEnvironment streamTabelEnv = ate(streamEnv, streamSettings);
KafkaTabelSource kafkaTabelSource = new KafkaTabelSource();
//注册⾃定义函数定义三个关键字:"KeyWord","WARNING","illegal"
//编写SQL
Table wordWithCount = streamTabelEnv.sqlQuery("SELECT key,COUNT(countv) AS countsum FROM kafkaDataStream LEFT JOIN LATERAL TABL E(CountKEY(respon)) as T(key, countv) ON TRUE GROUP BY key");
//直接输出Retract流
}
}
好词摘抄测试⽤Python脚本如下
# pypi/project/kafka-python/
import pickle
import time
import json
from kafka import KafkaProducer
表白送什么花合适producer = KafkaProducer(bootstrap_rvers=['127.0.0.1:9092'],
key_rializer=lambda k: pickle.dumps(k),
value_rializer=lambda v: pickle.dumps(v))
start_time = time.time()
for i in range(0, 10000):
print('------{}---------'.format(i))
producer = KafkaProducer(value_rializer=lambda v: json.dumps(v).encode('utf-8'),compression_type='gzip') producer.nd('test',{"respon":"resKeyWordWARNINGillegal","status":0,"protocol":"protocol","timestamp":0}) producer.nd('test',{"respon":"resKeyWordWARNINGillegal","status":1,"protocol":"protocol","timestamp":0}) producer.nd('test',{"respon":"resresKeyWordWARNING","status":2,"protocol":"protocol","timestamp":0}) producer.nd('test',{"respon":"resKeyWord","status":3,"protocol":"protocol","timestamp":0})
producer.nd('test',{"respon":"res","status":4,"protocol":"protocol","timestamp":0})
producer.nd('test',{"respon":"res","status":5,"protocol":"protocol","timestamp":0})
# future = producer.nd('test', key='num', value=i, partition=0)
# 将缓冲区的全部消息push到broker当中
producer.flush()
producer.clo()
end_time = time.time()
time_counts = end_time - start_time
print(time_counts)
控制台输出:
...
6> (fal,KeyWord,157)
3> (fal,WARNING,119)
3> (true,WARNING,120)
6> (true,KeyWord,158)
7> (true,illegal,80)
6> (fal,KeyWord,158)
6> (true,KeyWord,159)
6> (fal,KeyWord,159)
6> (true,KeyWord,160)
...
⾃定义聚合函数
⾃定义聚合函数(UDAGGs)将⼀个表聚合为⼀个标量值。
聚合函数适合⽤于累计的⼯作,上⾯的图显⽰了聚合的⼀个⽰例。假设您有⼀个包含饮料数据的表。
该表由三列组成:id、name和price,共计5⾏。想象⼀下,你需要找到所有饮料的最⾼价格。执⾏max()聚合。您需要检查5⾏中的每⼀⾏,结果将是单个数值。
⽤户定义的聚合函数是通过扩展AggregateFunction类来实现的。AggregateFunction的⼯作原理如下。⾸先,它需要⼀个累加器,这个累加器是保存聚合中间结果的数据结构。通过调⽤AggregateFunction的createAccumulator()⽅法来创建⼀个空的累加器。随后,对每个输⼊⾏调⽤该函数的accumulator()⽅法来更新累加器。处理完所有⾏之后,将调⽤函数的getValue()⽅法来计算并返回最终结果。
**每个AggregateFunction必须使⽤以下⽅法: **
createAccumulator()创建⼀个空的累加器
accumulate()更新累加器
getValue()计算并返回最终结果
除了上述⽅法之外,还有⼀些可选⽅法。虽然其中⼀些⽅法允许系统更有效地执⾏查询,但是对于某些⽤例是必需的。例如,如果应该在会话组窗⼝的上下⽂中应⽤聚合函数,那么merge()⽅法是必需的(当观察到连接它们的⾏时,需要连接两个会话窗⼝的累加器。
AggregateFunction可选⽅法
retract() 定义restract:减少Accumulator ,对于在有界窗⼝上的聚合是必需的。
merge() merge多个Accumulator , 对于许多批处理聚合和会话窗⼝聚合都是必需的。
retAccumulator() 重置Accumulator ,对于许多批处理聚合都是必需的。
使⽤聚合函数聚合最⼤的status值正宗红烧排骨
编写⾃定义聚合函数,⽤于聚合出最⼤的status
恋爱过程public class MaxStatus extends AggregateFunction<Integer,MaxStatus.StatusACC> {
@Override
public Integer getValue(StatusACC statusACC) {
return statusACC.maxStatus;
}
户外拓展训练
@Override
public StatusACC createAccumulator() {
return new StatusACC();
}
public void accumulate(StatusACC statusACC,int status){
if (status>statusACC.maxStatus){
statusACC.maxStatus=status;
}
}
public static class StatusACC{
public int maxStatus=0;
}
}
mian函数修改注册和SQL就可以使⽤
/**
*聚合最⼤的status
*/
Table wordWithCount = streamTabelEnv.sqlQuery("SELECT maxStatus(status) AS maxStatus FROM kafkaDataStream");
使⽤之前的python脚本测试
控制台输出(全部):
5> (fal,1)
8> (true,3)
3> (fal,0)
4> (true,1)
6> (true,2)
2> (true,0)
2> (true,4)
1> (fal,3)
7> (fal,2)
3> (fal,4)
4> (true,5)
除⾮输⼊更⼤的Status,否则控制台不会继续输出新结果
表聚合函数
⽤户定义的表聚合函数(UDTAGGs)将⼀个表(具有⼀个或多个属性的⼀个或多个⾏)聚合到具有多⾏和多列的结果表。和聚合函数⼏乎⼀致,有需求的朋友可以参考官⽅⽂档
GitHub
项⽬源码已上传⾄GitHub
我的专栏:
To Be Continue=>
>拔火罐的作用