轻松理解Spark的aggregate方法

更新时间:2023-07-06 11:05:37 阅读: 评论:0

轻松理解Spark的aggregate⽅法
2019-04-20
关键字: Spark 的 agrregate 作⽤、Scala 的 aggregate 是什么
Spark 编程中的 aggregate ⽅法还是⽐较常⽤的。本篇⽂章站在初学者的⾓度以⼤⽩话的形式来讲解⼀下 aggregate ⽅法。
aggregate ⽅法是⼀个聚合函数,接受多个输⼊,并按照⼀定的规则运算以后输出⼀个结果值。
aggregate 在哪
aggregate ⽅法是 Spark 编程模型 RDD 类( org.apache.spark.RDD ) 中定义的⼀个公有⽅法。它的⽅法声明如下
1def aggregate[U: ClassTag](zeroValue: U)(qOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
2
domesticate>a place in the sun
3    // ...
4
5  }
aggregate 的参数是什么意思
然后我们⼀块⼀块来学习这个⽅法的声明。其实这⼩节讲的,都是 Scala 的语法知识。
def aggregate[U: ClassTag](zeroValue: U)(qOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
⾸先看到的是 “泛型” 声明。懂 Java 的同学直接把这个 " [U: ClassTag] " 理解成是⼀个泛型声明就好了。如果您不是很熟悉 Java 语⾔,那我们只需要知道这个 U 表⽰我们的 aggregate ⽅法只能接受某⼀种类型的输⼊值,⾄于到底是哪种类型,要看您在具体调⽤的时候给了什么类型。
def aggregate[U: ClassTag](zeroValue: U)(qOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
然后我们来看看 aggregate 的参数列表。明显这个 aggregate ⽅法是⼀个柯⾥化函数。柯⾥化的知识不在本篇⽂章讨论的范围之内。如果您还不了解柯⾥化的概念,那在这⾥简单地理解为是通过多个圆括号来接受多个输⼊参数就可以了。
然后我们来看看第 1 部分,即上⾯蓝⾊加粗的 " (zeroValue: U) " 。这个表⽰它接受⼀个任意类型的输⼊参数,变量名为 zeroValue 。这个值就是初值,⾄于这个初值的作⽤,姑且不⽤理会,等到下⼀⼩节通过实例来讲解会更明了,在这⾥只需要记住它是⼀个 “只使⽤⼀次” 的值就好了。
第 2 部分,我们还可以再把它拆分⼀下,因为它⾥⾯其实有两个参数。笔者认为 Scala 语法在定义多个参数时,辨识度⽐较弱,不睁⼤眼睛仔细看,很难确定它到底有⼏个参数。
⾸先是第 1 个参数 " qOp: (U, T) => U " 它是⼀个函数类型,以⼀个输⼊为任意两个类型 U, T ⽽输出为 U 类型的函数作为参数。这个函数会先被执⾏。这个参数函数的作⽤是为每⼀个分⽚(slice)中的数据遍历应⽤⼀次函数。换句话说就是假设我们的输⼊数据集( RDD )有 1 个分⽚,则只有⼀个 qOp 函数在运⾏,假设有 3 个分⽚,则有三个 qOp 函数在运⾏。可能有点难以理解,不过没关系,到后⾯结合实例就很容易理解了。
另⼀个参数 " combOp: (U, U) => U " 接受的也是⼀个函数类型,以输⼊为任意类型的两个输⼊参数⽽输出为⼀个与输⼊同类型的值的函数作为参数。这个函数会在上⾯那个函数执⾏以后再执⾏。这个参
数函数的输⼊数据来⾃于第⼀个参数函数的输出结果,这个函数仅会执⾏ 1 次,它是⽤来最终聚合结果⽤的。同样这⾥搞不懂没关系,下⼀⼩节的实例部分保证让您明⽩。
def aggregate[U: ClassTag](zeroValue: U)(qOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
最后是上⾯这个红⾊加粗的 " : U " 它是 aggregate ⽅法的返回值类型,也是泛型表⽰。
对了,最后还有⼀个 " withScope ",这个就不介绍了,因为笔者也不知道它是⼲嘛的,哈哈哈哈。反正对我们理解这个⽅法也没什么影响。
aggregate 正确的使⽤姿势
我们直接在 spark-shell 中来演⽰实例了。这⾥以两个⼩例⼦来演⽰,⼀个是不带分⽚的 RDD ,另⼀个则是带 3 个分⽚的 RDD 。
⾸先我们来创建⼀个 RDD
scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd1 collect
warning: there was one feature warning; re-run with -feature for details
res24: Array[Int] = Array(1, 2, 3, 4, 5)
川普女儿这个 RDD 仅有 1 个分⽚,包含 5 个数据: 1, 2, 3, 4, 5 。
然后我们来应⽤⼀下 aggregate ⽅法。
哦,不对,在使⽤ aggregate 之前,我们还是先定义两个要给 aggregate 当作输⼊参数的函数吧。家长会学生代表发言演讲稿
scala> :paste
// Entering paste mode (ctrl-D to finish)
def pfun1(p1: Int, p2: Int): Int = {
p1 * p2
}
/
/ Exiting paste mode, now interpreting.tiny
pfun1: (p1: Int, p2: Int)Int
scala>
⾸先来定义第 1 个函数,即等下要被当成 qOp 的形参使⽤的函数。在上⼀⼩节我们知道 qOp 函数是⼀个输⼊类型为 U, T 类型⽽输出为 U 类型的函数。但是在这⾥,因为我们的 RDD 只包含⼀个 Int 类型数据,所以这⾥的 qOp 的两个输⼊参数都是 Int 类型的,这是没⽑病的哦!然后这个函数的返回类型也为 Int 。我们这个函数的作⽤就是将输⼊的参数 p1 , p2 求积以后返回。
scala> :paste
操场的英文// Entering paste mode (ctrl-D to finish)
def pfun2(p3: Int, p4: Int): Int = {
p3 + p4
}
/
/ Exiting paste mode, now interpreting.
pfun2: (p3: Int, p4: Int)Int
scala>
接着是第 2 个函数。就不再解释什么了。
然后终于可以开始应⽤我们的 aggregate ⽅法了。
scala> rdd1.aggregate(3)(pfun1, pfun2)
res25: Int = 363
scala>
输出结果是 363 !这个结果是怎么算出来的呢?
2014高考题⾸先我们的 zeroValue 即初值是3。然后通过上⾯⼩节的介绍,我们知道⾸先会应⽤pfun1函数,因为我们这个 RDD 只有 1 个分⽚,所以整个运算过程只会有⼀次 pfun1 函数调⽤。它的计算过程如下:
⾸先⽤初值 3 作为 pfun1 的参数 p1 ,然后再⽤ RDD 中的第 1 个值,即 1 作为 pfun1 的参数 p2 。由此我们可以得到第⼀个计算值为3 * 1 = 3。接着这个结果 3 被当成 p1 参数传⼊,RDD 中的第 2 个值即 2 被当成 p2 传⼊,由此得到第⼆个计算结果为3 * 2 = 6。以此类推,整个 pfun1 函数执⾏完成以后,得到的结果是3 * 1 * 2 * 3 * 4 * 5 = 360。这个 pfun1 的应⽤过程有点像是 “在 RDD 中滑动计算” 。
在 aggregate ⽅法的第 1 个参数函数 pfun1 执⾏完毕以后,我们得到了结果值 360 。于是,这个时候就要开始执⾏第 2 个参数函数 pfun2 了。
pfun2 的执⾏过程与 pfun1 是差不多的,同样会将 zeroValue 作为第⼀次运算的参数传⼊,在这⾥即是将 zeroValue 即3当成 p3 参数传⼊,然后是将pfun1 的结果360当成 p4 参数传⼊,由此得到计算结果为 363 。因为 pfun1 仅有⼀个结果值,所以整个 aggregate 过程就计算完毕了,最终的结果值就是363。
怎么样?相信您已经完全明⽩ aggregate ⽅法的的作⽤与⽤法了吧。下⾯再贴⼀个有多个分⽚的 RDD 的⽰例。
scala> val rdd2 = sc.makeRDD(1 to 10, 3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24
scala> NumPartitions
res26: Int = 3
scala> rdd2.foreachPartition(myprint)
中文名字翻译英文名字1 ,
2 ,
3 ,
4 ,
5 ,
6 ,
7 , 8 , 9 , 10 ,
这⾥定义了⼀个拥有 3 个分⽚的 RDD 。然后 aggregate 的两个函数参数仍然是使⽤上⾯定义的 pfun1 与 pfun2 。
mid autumn dayscala> rdd2.aggregate(2)(pfun1, pfun2)
res29: Int = 10334
动画片美女与野兽
结果是 10334 。怎么来的呢?
因为前⾯⼩节有提到 qOp 函数,即这⾥的 pfun1 函数会分别在 RDD 的每个分⽚中应⽤⼀次,所以这⾥ pfun1 的计算过程为
2 * 1 * 2 *
3      = 12
2 * 4 * 5 * 6      = 240
2 * 7 * 8 * 9 * 10  = 10080
标橙的为 zeroValue 。
在这⾥ pfun1 的输出结果有 3 个值。然后就来应⽤ combOp 即这⾥的 pfun2
2 + 12 + 240 + 10080  = 10334所以,结果就是 10334 咯!

本文发布于:2023-07-06 11:05:37,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/168802.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:函数   参数   类型   结果   实例   输出   计算   接受
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图