大数据spark常用算子用法总结(个人学习笔记)

更新时间:2023-06-18 09:52:53 阅读: 评论:0

⼤数据spark常⽤算⼦⽤法总结(个⼈学习笔记)#coding=utf-8
#!/usr/bin/python
# from pyspark.sql import SQLContext, Row        #导⼊pyspark模块
from pyspark import SparkContext, SparkConf
注意转移from pyspark.sql import SparkSession            #SparkSession 是 Spark SQL 的⼊⼝
“”"
该代码⽤于练习spark的各种算⼦操作,了解各种算⼦的具体⽤法
“”"
1.map算⼦
#map算⼦
def test_map():
八尺见方打一字
# map() 接收⼀个函数,把这个函数⽤于 RDD 中的每个元素,将函数的返回结果作为新的RDD
spark = SparkSession.builder.appName("map").getOrCreate()
sc = spark.sparkContext
#map
x= sc.parallelize([1,2,3])#parallelize函数将列表转化为⼀个RDD对象
y =x.map(lambda x:(x,x**2))
llect())
llect())
#[1,2,3]
#[(1,1),(2,4),(3,9)]
2.filter算⼦
成长中
在这⾥插⼊代码⽚def test_filter():
#filter 挑选符合条件的结果
spark =SparkSession.builder.appName("filter").getOrCreate()
sc = spark.sparkContext
# filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x %2==1)
llect())
llect())
#[1,2,3]
#[1,3]
3.flatMap算⼦
def test_flatMap():
# 与map类似,将原RDD中的每个元素通过函数f转换为新的元素,并将这些元素放⼊⼀个集合,构成新的RDD
淋巴按摩spark =SparkSession.builder.appName("flatMap").getOrCreate()
sc = spark.sparkContext
#flatMap
x = sc.parallelize([[1,2,3],[4,5,6]])
苹果怎么安装外来软件y = x.flatMap(lambda x :x)
llect())
llect())
#[[1,2,3],[4,5,6]]
#[1,2,3,4,5,6]
def test_reduceByKey():
"""
reduceByKey将相同key的前两个value传给输⼊函数,产⽣⼀个新的return值,
新产⽣的return值与下⼀个value(第三个元素)组成两个元素,再被传给输⼊函数,直到最后只有⼀个值为⽌。
"""
spark =SparkSession.builder.appName("reduceByKey").getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.reduceByKey(lambda x,y:x+y)
llect())
llect())
# [('B',1),('B',2),('A',3),('A',4),('A',5)]
# [('A', 12), ('B', 3)]
def test_groupByKey():
# groupByKey 按照key进⾏分组,直接进⾏shuffle
spark =SparkSession.builder.appName("groupByKey").getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.groupByKey()
llect())
print([(j[0],[i for i in j[1]])for j llect()])
#[('B',1),('B',2),('A',3),('A',4),('A',5)]
#[('A', [3, 4, 5]), ('B', [1, 2])]
6.aggregateByKey算⼦
def test_aggregateByKey():
"""
aggregateByKey,先说分为三个参数的:
第⼀个参数是,每个key的初始值
第⼆个是个函数, Seq Function,经测试这个函数就是⽤来先对每个分区内的数据按照key分别进⾏定义进⾏函数定义的操作第三个是个函数, Combiner Function,对经过 Seq Function 处理过的数据按照key分别进⾏进⾏函数定义的操作
"""
# 按分区聚合,再总的聚合,每次要跟初始值交流
spark =SparkSession.builder.appName("aggregateByKey").getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue =[]
mergeVal =(lambda aggregated,el:aggregated +[(el,el**2)])
mergeComb =(lambda agg1,agg2: agg1+agg2)
y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
llect())
llect())
# [('B',1),('B',2),('A',3),('A',4),('A',5)]
# [('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
7.union算⼦
# 将两个RDD合并,不去重
spark =SparkSession.builder.appName("union").getOrCreate()    sc = spark.sparkContext
孙权劝学文言文翻译#union
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
llect())
llect())
llect())
# ['A','A','B'] ['D','C','A']
# ['A', 'A', 'B', 'D', 'C', 'A']
8.take算⼦
def test_take():
# take函数⽤于获取RDD中从0到num-1下标的元素(不排序)    spark =SparkSession.builder.appName("take").getOrCreate()    sc = spark.sparkContext
#take
x = sc.parallelize([1,3,1,2,3])
y = x.take(num=3)
llect())
print(y)
#[1, 3, 1, 2, 3]
#[1, 3, 1]
9.first算⼦
def test_first():
# first返回RDD中的第⼀个元素,不排序
spark =SparkSession.builder.appName("first").getOrCreate()    sc = spark.sparkContext
#first
x = sc.parallelize([1,3,1,2,3])
y = x.first()
llect())
print(y)
# [1, 3, 1, 2, 3]
# 1
10.foreach算⼦
# foreach:⽤于遍历RDD,将函数f应⽤于每⼀个元素,⽆返回值(action算⼦)。
spark =SparkSession.builder.appName("foreach").getOrCreate()
sc = spark.sparkContext
#foreach
def f(x):
print(x)
x = sc.parallelize([1,2,3,4,5])
y = x.foreach(f)#先输出y值,因为y调⽤了f函数,⾥⾯包含了print
llect())#这⼀步才会执⾏打印x
print(y)
# 1
# 2
# 3
# 4
# 5
# [1, 2, 3, 4, 5]
# None
def test_count():
# count⽤于统计RDD中元素个数
spark =SparkSession.builder.appName("count").getOrCreate()
sc = spark.sparkContext
#count
x = sc.parallelize([1,3,2])
y = x.count()
llect())
print(y)
# [1, 3, 2]
# 3
def test_collect():
"""
如果数据量⽐较⼤的时候,尽量不要使⽤collect函数,因为这可能导致Driver端内存溢出问题。建议使⽤rdd.take(100).foreach(println),
⽽不使⽤llect().foreach(println)
"""
spark =SparkSession.builder.appName("collect").getOrCreate()
sc = spark.sparkContext
#collect群星合唱
x = sc.parallelize([1,2,3])
y = x.collect()
print(x)
print(y)
# ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:184
# [1, 2, 3]
13.sample算⼦
"""
1、withReplacement:元素是否可以重复抽样
2、fraction:期望样本的⼤⼩作为RDD⼤⼩的⼀部分,
当withReplacement=fal时:选择每个元素的概率;fraction⼀定是[0,1]区间内的数字;当withReplacement=true时:选择每个元素的期望次数; fraction必须⼤于等于0。
学前教育论文3、ed:随机数⽣成器的种⼦,不好把控,建议默认。
"""
spark =SparkSession.builder.appName("sample").getOrCreate()
sc = spark.sparkContext
#sample
x = sc.parallelize(range(7))
ylist =[x.sample(withReplacement =Fal,fraction =0.5)for i in range(5)]
print('x ='+llect()))
for cnt,y in zip(range(len(ylist)),ylist):
print('sample:'+str(cnt)+'y ='+llect()))
# x =[0, 1, 2, 3, 4, 5, 6]
# sample:0y =[0, 1, 5, 6]
# sample:1y =[1, 3, 5, 6]
# sample:2y =[3]
# sample:3y =[2, 4, 6]
# sample:4y =[1, 4, 6]
14.takeSample算⼦
def test_takeSample():
"""
返回⼀个数组
该⽅法仅在预期结果数组很⼩的情况下使⽤,因为所有数据都被加载到driver的内存中。
1、withReplacement:元素可以多次抽样(在抽样时替换)
2、num:返回的样本的⼤⼩
3、ed:随机数⽣成器的种⼦,不好把控,建议默认
"""
spark =SparkSession.builder.appName("takeSample").getOrCreate()
sc = spark.sparkContext
# tekeSample
x = sc.parallelize(range(7))
ylist =[x.takeSample(withReplacement=Fal,num=3)for i in range(5)]
print('x = '+llect()))
for cnt,y in zip(range(len(ylist)),ylist):
print('sample'+str(cnt)+'y ='+str(y))
# x = [0, 1, 2, 3, 4, 5, 6]
# sample0y =[1, 3, 5]
# sample1y =[6, 1, 2]
# sample2y =[4, 5, 3]
# sample3y =[3, 0, 5]
# sample4y =[6, 2, 3]
15.sampleByKey算⼦

本文发布于:2023-06-18 09:52:53,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/982346.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:函数   元素   结果   内存   定义   抽样   学习   操作
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图