九年级英语作业本答案Flink⾃定义aggregate聚合函数的步骤
在Flink计算中,常见的⼀些操作是map或者flatmap⼀些数据之后keyby 开窗⼝进⾏计算。那么在这些计算当中有哪些算⼦呢?
其中我分为两类算⼦。
增量聚合 有reduce 和aggregate算⼦,全量聚合 有apply和process。那么今天我们就主要讲解⼀下常⽤的增量聚合算⼦aggregate算⼦。
aggregate⽅法签名的三个类型 <;数据源类型,累加器类型,输出类型>
WindowFunction ⽅法签名的四个类型为 <IN, OUT, KEY, W extends Window>
第⼀步:将dataStream转换城windowedStream
// 从kafka读取数据
val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
.
map(data => {
val dataArray = data.split(",")aosp>济南新东方学校
UrBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
鞋襻})
.assignAscendingTimestamps(_.timestamp * 1000L)
dick什么意思// 对数据进⾏窗⼝聚合处理
tough是什么意思
val aggStream: DataStream[ItemViewCount] = inputStream
.filter(_.behavior == "pv") // 过滤出pv数据
英文论文翻译
.keyBy(_.itemId)
.timeWindow(Time.hours(1), Time.minutes(5)) // 开窗进⾏统计
.aggregate(new CountAgg(), new WindowCountResult()) // 聚合出当前商品在时间窗⼝内的统计数量
第⼆步:⾃定义聚合函数
// ⾃定义的预聚合函数,来⼀条数据就加⼀
class CountAgg() extends AggregateFunction[UrBehavior, Long, Long] {
//add⽅法为累加器累加的⽅法,这⾥为最简单的+1操作
override def add(value: UrBehavior, accumulator: Long): Long = accumulator + 1
//初始化累加值
override def createAccumulator(): Long = 0L
//最后返回那个值,这⾥为accumulator
override def getResult(accumulator: Long): Long = accumulator
//分区处理的归并操作,这⾥将所有并处理的结果相加
override def merge(a: Long, b: Long): Long = a + b
}
第三部:⾃定义窗⼝函数
// ⾃定义window functionbenjamin
class WindowCountResult() extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {
//Long类型的Key为上⼀步的⾃定义累加器的返回值
//Window为差给你扣类型,第⼀步中的没窗⼝类型,TimeWindow
popular//input为接收的数据类型,此处为Long类型的迭代器
//out为此⽅法返回的类型,此处为ItemViewCount样例类对象的集合
override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
//调⽤ItemViewCount样例类对象的构造器,依次构造出ItemViewCount样例类并返回
true colors
}
}