Spark ⼊门-聚合算⼦详解
Overview
Spark中常⽤的算⼦加起来有⼏⼗个,其中针对key的聚合算⼦有五个,分别是groupBy 、groupByKey 、reduceByKey 、aggregateByKey 和flodByKey ,有⼏个底层其实调⽤的都是⼀个⽅法,只不过传⼊的参数不⼀样产⽣了这⼏个算⼦,但我仍打算分开来详解每个算⼦的计算过程,加深理解。
这⼏个聚合算⼦要解决的问题都是将所需操作的RDD中的key值相同的value值聚合在⼀起然后两两做计算也就是聚合最终得出⼀个结果,这⾥有三个点需要注意,⼀是两两聚合的初始值,是从外部传⼊还是使⽤默认值;⼆是分区内聚合⽅式,因为RDD默认是并⾏计算,会分成多个分区,每个分区内部可以指定聚合⽅式;三是分区间聚合⽅式,拿到分区内的聚合结果就要考虑分区间的聚合⽅式了,这个参数也可以指定。所以这⼏种算⼦的区别就是因为传⼊了不同的参数。
groupBy
先来说说groupBy,它是最容易理解的,就是把key值相同的value值放在⼀起形成(key, iter)的键值对,聚合的话需要使⽤map再对每个key对应的iter内容做聚合。
先来看看源码,需要传⼊⼀个group⽅法f,这个f⽅法传⼊待分组的元素,返回另外⼀个值K,⽽这个K就
是分组的依据,注意看最后groupBy返回的结果类型也是以K和相同K的初始元素⽣成的迭代器所组成的元组,需要对相同K下的iter进⾏聚合就需要再进⾏map操作。具体过程可以参考下图,第⼆步添加了File落盘动作,因为group操作会计算每个分区所有单词的⾸字母并缓存下来,如果放在内存中若数
据过多则会产⽣内存溢出;再就是第三步从⽂件读取回来,并不⼀定是三个分区,这⾥只是为了便于理解。
groupByKey
groupByKey相⽐于groupBy不同的是,groupBy需要指定分组的key,⽽groupByKey是将元组这种类型的第⼀个值作为key,对第⼆个值进⾏分组的操作。
可以看到这个算⼦不需要传⼊参数,就是针对元组这种KV类型定义的,⾄于返回值的类型,K就是元组的第⼀个值,Iterable(V)则是相同K 值的所有V组成的迭代器,那么同时处理元组类型时groupByKe
y和groupBy的不同之处就是这⾥的V是元组内的第⼆个值,⽽groupBy是初始的元素值,具体看下⾯的例⼦:
groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
1
// 计算初始RDD 不同⾸字母开头的元素数量2
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Java", "Python", "PHP", "Help"))3
// ('H', ("Hello", "Help")), ('J', ("Java")), ('P', ("Python", "PHP"))4
val groupRDD: RDD[(Char, Iterable[String])] = upBy(_.charAt(0))5
// ('H', 2), ('J', 1), ('P', 2)6val sizeRDD: RDD[(Char, Int)] = groupRDD.map(_._2.llect().foreach(println)
groupByKey(): RDD[(K, Iterable[V])]
groupByKey也需要落盘操作,会导致数据打乱重组,存在shuffle操作,效率相对来说⽐较低下,这也
就引出了reduceByKey,下⾯再详
细⽐较两者的不同之处。
reduceByKey
reduceByKey相⽐于groupByKey就是把map操作集成在算⼦当中了,不需要再额外进⾏map操作,它和aggregateByKey以及
flodByKey的操作类似,只不过细节之处需要传⼊不同的参数区分彼此不同的功能。
可以看到reduceByKey接收⼀个func参数,⽽这个func参数接收两个V类型的参数并返回⼀个V类型的结果,这⾥的V其实就是初始RDD 中的元素,这⾥需要传⼊的func就是元素两两计算的逻辑。
从下图中的第⼀张图看相对于groupByKey只是少了map的步骤将它整合在reduceByKey中,但是实际
上reduceByKey的作⽤不⽌于此,第⼆张图才是实际的运⾏模式,它提供了Combine预聚合的功能,⽀持在分区中先进⾏聚合,称作分区内聚合,然后再落盘等待分区间聚合。这样下来它不只是减少了map的操作,同时提供了分区内聚合使得shuffle落盘时的数据量尽量⼩,IO效率也会提⾼不少。最后它引出了分区内聚合和分区间聚合,reduceByKey的分区内聚合和分区间聚合是⼀样的。
1
刘欢个人资料简介// 根据RDD 内元组的第⼀个元素将数据分类并对第⼆个元素求和2
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("b", 2)))3
// ("a", (1, 3, 4)), ("b", (2))4
val groupByKeyRDD: RDD[(String, Iterable[Int])] = sc.groupByKey()5
// ("a", (("a", 1), ("a", 3), ("a", 4))), ("b", ("b", 2))6
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = sc.groupBy(_._1)7
8
/
/ 当然聚合⽅式也不相同9groupByKeyRDD.map(_._2.sum)10groupByRDD.map(_._2.map(_._2).sum)
reduceByKey(func: (V, V) => V): RDD[(K, V)]
1
// 根据RDD 内元组的第⼀个元素将数据分类并对第⼆个元素求和2
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("b", 2)))3val ReduceByKeyRDD: RDD[(String, Int)] = duceByKey(_ + _) // ("a", 8), ("b", llect().foreach(println)
aggregateByKey
怎么开通网银aggregateByKey是对reduceByKey的⾼级应⽤,它可以分开来指定分区内聚合和分区间聚合,并提供了⼀个计算初始值。
开场白聊天来看上⽅源码,它采⽤柯⾥化操作,第⼀个参数列表接收⼀个参数zeroValue,它提供⼀个初始值,不同于reduceByKey直接开始计算第⼀个元素和第⼆个元素,aggregateByKey允许先⽤初始值和第⼀个元素进⾏两两计算;第⼆个参数列表接收两个参数,第⼀个是qOp表⽰分区内聚合⽅式,它接收两个参数返回⼀个参数,注意接收的参数⼀个U的类型和zeroValue类型相同,另外⼀个是初始元素的类型,返回类型是U类型,说明返回类型是由zeroValue决定的,这很重要;第⼆个参数combOp表⽰分区间聚合⽅式,接收两个U类型的参数并返回⼀个U类型的参数。最终返回初始元素和聚合后的元素。
看运⾏过程就更清晰了,相⽐于reduceByKey只是将分区内聚合和分区间聚合分开来了,并且提供了⼀个初始值,这个初始值作为第⼀个元素与初始RDD的第⼀个元素计算,这也就使得初始值不⼀样哪怕聚合⽅式相同结果也可能不⼀样,详情看下图。其次就是分区数量对结果的影响,上⽅例⼦如果按三个分区计算结果⼜不⼀样了,它作为aggregateByKey的第四个决定结果的隐形参数在聚合时也需要考虑在内。
aggregateByKey[U: ClassTag](zeroValue: U)(qOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)
]
1
// 给定初始RDD 并指定两个分区,分区内计算最⼤值分区间求和2
val rdd: RDD[(String, Int)] = sc.makeRDD(3
List(("a",1), ("a", 2), ("b", 3), ("a", 4), ("a", 5), ("b", 6)), 4
numSlices = 25
)6
// (("a", 8), ("b", 8))7
val aggregateByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(8
(x, y) => math.max(x, y), // 分区内求最⼤值9
(x, y) => x + y // 分区间求和llect().foreach(println)
flodByKey
flodByKey是aggregateByKey的特例情况,在分区内聚合⽅式和分区间聚合⽅式相同的时候使⽤。
仍然是柯⾥化传参,第⼀个参数列表给定⼀个初始值,第⼆个参数列表传⼊⼀个聚合函数func,在⼀定条件下和reduceByKey的结果和聚合⽅式是相同的。
看运⾏过程还是⽐较容易理解的,尤其需要注意初始值的设定,不然会产⽣意想不到的结果。
Compare
因为有我前⾯把每个算⼦的详细计算过程都画了⼀遍,接下来从源码中函数的接收参数中继续看reduceByKey 、aggregateByKey 和flodByKey 这三个算⼦的联系和不同之处,它们的源码中都调⽤了⼀个函数combineByKeyWithClassTag ,接下来来看⼀看传⼊的参数。从它们源码调⽤的函数就可以很清楚的区分这⼏个算⼦了,结合不同环境使⽤不同的算⼦。
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
1
// 计算初始值与所有元素的和2val rdd: RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("a", 2)), 2)3val foldByKeyRDD: RDD[Int] = rdd.foldByKey(10)(_ + _) // 10 + 1 + 3 + 10 + 4 + 2 = 30
1// reduceByKey
2combineByKeyWithClassTag[V]((v: V) =>
3 v, // 就是初始RDD的值,当作每个分区开始计算的初始值,不需要指定
4 func, // 分区内聚合
5 func, // 分区间聚合
6 partitioner)
7// aggregateByKey
8combineByKeyWithClassTag[U]((v: V) =>
9 cleanedSeqOp(createZero(), v), // 初始值,柯⾥化的第⼀个参数列表
10 cleanedSeqOp, // 分区内聚合,柯⾥化的第⼆个参数列表
11 combOp, // 分区间聚合,与分区内聚合不相同
流量卡怎么用12 partitioner)
13// flodByKey
14combineByKeyWithClassTag[V]((v: V) =>
15 cleanedFunc(createZero(), v), // 初始值,柯⾥化的第⼀个参数列表
16 cleanedFunc, // 分区内聚合,柯⾥化的第⼆个参数列表
17 cleanedFunc, // 分区间聚合,与分区内聚合相同
18 partitioner)
Application
1. 获取⾸字母相同key数据的和
Yahweh
1val rdd: RDD[(String, Int)] = sc.makeRDD(
甲减的原因2 List(("Hello", 1), ("Java", 3), ("Python", 5), ("PHP", 7), ("Help", 9)))
3
4rdd.map(kv => (kv._1.charAt(0), kv._2)) // 先将原始RDD的⾸字母提出来
5 .reduceByKey(_ + _) // 再按照key进⾏求和
6 .collect()
7 .foreach(println) // ("P",12), ("H", 10), ("C", 3)
1. 获取相同key数据的平均值
1val rdd: RDD[(String, Int)] = sc.makeRDD(
2 List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8)), 2)
3
4rdd.aggregateByKey((0.0, 0))( // 元组第⼀个元素接收求和数据,0.0避免求均值强转为Int,第⼆个接收数据计数
5 (k, v) => (k._1 + v, k._2 + 1), // 分区内按key累加、计数
6 (k, v) => (k._1 + v._1, k._2 + v._2) // 分区间将分区内的统计结果累加
7)
8 .map(kv => (kv._1, kv._2._1 / kv._2._2)) // 求均值
9 .collect()
10 .foreach(println) // ("a",3.5), ("b", 5.5)
1. 获取相同key的数据分区内求均值分区间求和的结果
1rdd.aggregateByKey((0.0, 0))(
2 (k, v) => ((k._1 + v), k._2 + 1),
3 (k, v) => (k._1 / k._2 + v._1 / v._2, k._2 + v._2) // 直接在这⼀步先计算分区内均值再求和
4)
5 .collect()
6 .foreach(println) // ("a",(7.0, 4)), ("b", (11.0, 4))
辱母案
1. 数据如下所⽰每⼀⾏数据是⼀条点击记录,字段分别为(时间戳 省份 市 ⽤户 ⼴告),计算每个省份点击次数前三名的⼴告。