spark中使⽤groupByKey进⾏分组排序
任务需求:已知RDD[(query:String, item_id:String, imp:Int, clk:Int)],要求找到每个query对应的点击最多的前2个item_id,即:按照query分组,并按照clk降序排序,每组取前两个。访问页面升级中
金针菇卷
例如:
(连⾐裙,1234, 22, 13)
(⽜仔裤,2768, 34, 7)
(连⾐裙,1673,45, 9)
(衬⾐,3468, 67, 12)
玫瑰泡水(⽜仔裤,2754, 68, 20)
(连⾐裙,1976,93, 29)
希望得到:
鼠标自动移动(连⾐裙,1976,93, 29)
(连⾐裙,1234, 22, 13)
(⽜仔裤,2754, 68, 20)
(⽜仔裤,2768, 34, 7)
(衬⾐,3468, 67, 12)
先看⼀个错误的版本:
val topItem_t= data_t.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey()
.map(line => {
val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].rever).
take(2)
(line._1, topItem(0), topItem(1), topItem(2))
})
我们把query作为key,其余放到⼀起,groupByKey后(map之前),类型为:RDD[(String, Iterable[(String, Int, Int)])],根据query 分组再map,line._2.toArray把Iterable转为Array,sortBy(_._3)是按最后⼀个Int即clk排序,(Ordering[Int].rever)表⽰从⼤到⼩(sortBy默认从⼩到⼤,注意这⾥的sortBy是Array的成员函数⽽不是rdd的sortBy,⽤法⽐较不同),take(2)是取前2个,然后返回(query, item_id)。跑⼀下上⾯的过程。
平凡之路吉他谱你会发现,返回结果只有3项:
生活就像一首歌
笔管鱼的做法(连⾐裙,1976,93, 29)上海海事展
(⽜仔裤,2754, 68, 20)
(衬⾐,3468, 67, 12)
为什么呢?因为这⾥要⽤flatMap⽽不是Map:
val topItem_t= data_t.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey()
.flatMap(line => {
val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].rever).
take(2)
topItem.map(line._1,_.1)
})
为什么呢?GroupByKey后,类型为RDD[(String, Iterable[(String, Int, Int)])],如果⽤map,那每⼀个key对应的⼀个Iterable变量,相当于⼀条数据,map后的结果⾃然还是⼀条。但flatMap,相当于map+flat操作,这才是我们真正的需要的形式。