Spark之⾯试题
1、Spark有⼏种部署⽅式?(重点)
Spark⽀持3种集群管理器(Cluster Manager),分别为:
1. Standalone:独⽴模式,Spark原⽣的简单集群管理器,⾃带完整的服务,可单独部署到⼀个集群中,⽆需依赖任何其他资源管理系
统,使⽤Standalone可以很⽅便地搭建⼀个集群;
2. Apache Mesos:⼀个强⼤的分布式资源管理框架,它允许多种不同的框架部署在其上,包括yarn;
3. Hadoop YARN:统⼀的资源管理机制,在上⾯可以运⾏多套计算框架,如map reduce、storm等,根据driver在集群中的位置不同,besure
分为yarn client和yarn cluster。
实际上,除了上述这些通⽤的集群管理器外,Spark内部也提供了⼀些⽅便⽤户测试和学习的简单集群部署模式。由于在实际⼯⼚环境下使⽤的绝⼤多数的集群管理器是Hadoop YARN,因此我们关注的重点是Hadoop YARN模式下的Spark集群部署。
Spark的运⾏模式取决于传递给SparkContext的MASTER环境变量的值,个别模式还需要辅助的程序接⼝来配合使⽤,⽬前⽀持的Master字符串及URL包括:
Spark运⾏模式配置
Master URL Meaning
local在本地运⾏,只有⼀个⼯作进程,⽆并⾏计算能⼒。
increadlocal[K]在本地运⾏,有K个⼯作进程,通常设置K为机器的CPU
核⼼数量。
local[*]在本地运⾏,⼯作进程数量等于机器的CPU核⼼数量。
spark://HOST:PORT以Standalone模式运⾏,这是Spark⾃⾝提供的集群运⾏
模式,默认端⼝号: 7077。
mesos://HOST:PORT在Mesos集群上运⾏,Driver进程和Worker进程运⾏在
Mesos集群上,部署模式必须使⽤固定值:--deploy-mode
cluster
yarn-client在Yarn集群上运⾏,Driver进程在本地,Executor进程在
Yarn集群上,部署模式必须使⽤固定值:--deploy-mode
client。Yarn集群地址必须在HADOOP_CONF_DIR or
YARN_CONF_DIR变量⾥定义。
yarn-cluster在Yarn集群上运⾏,Driver进程在Yarn集群上,Work进
程也在Yarn集群上,部署模式必须使⽤固定值:--deploy-
mode cluster。Yarn集群地址必须在
HADOOP_CONF_DIR or YARN_CONF_DIR变量⾥定
义。
⽤户在提交任务给Spark处理时,以下两个参数共同决定了Spark的运⾏⽅式。
·
–master MASTER_URL :决定了Spark任务提交给哪种集群处理。
· –deploy-mode DEPLOY_MODE:决定了Driver的运⾏⽅式,可选值为Client或者Cluster。
2、Spark提交作业参数(重点)
参考答案:
1)在提交任务时的⼏个重要参数
executor-cores —— 每个executor使⽤的内核数,默认为1,官⽅建议2-5个,企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存⼤⼩,默认1G
driver-cores —— driver使⽤内核数,默认为1
driver-memory —— driver内存⼤⼩,默认512M
2)给出⼀个提交任务的样式
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 7 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
skl--name "Spark Job Name" \
InputPath \
OutputPath
3、简述Spark on yarn的作业提交流程(重点)
YARN Client模式
在YARN Client模式下,Driver在任务提交的本地机器上运⾏,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于⼀个ExecutorLaucher,只负责向ResourceManager申请Executor内存。
ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执⾏main函数,之后执⾏到Action算⼦时,触发⼀个job,并根据宽依赖开始划分stage,每个stage⽣成对应的taskSet,之后将task分发到各个Executor上执⾏。
YARN Cluster模式
YARN Cluster模式
在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的Applicati
onMaster就是Driver。
Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执⾏main函数,之后执⾏到Action算⼦时,触发⼀个job,并根据宽依赖开始划分stage,每个stage⽣成对应的taskSet,之后将task分发到各个Executor上执⾏。
4、请列举Spark的transformation算⼦(不少于5个)(重点)1)map
2)flatMap
joyo3)filter
4)groupByKey
5)reduceByKey
6)sortByKey
5、请列举Spark的action算⼦(不少于5个)(重点)
1)reduce:
2)collect:
3)first:
4)take:
5)aggregate:
6)countByKey:
7)foreach:
pearl harbour8)saveAsTextFile:
6、简述Spark的两种核⼼Shuffle(重点)
spark的Shuffle有Hash Shuffle和Sort Shuffle两种。
paypal 是什么在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。
工程管理系统
HashShuffleManager有着⼀个⾮常严重的弊端,就是会产⽣⼤量的中间磁盘⽂件,进⽽由⼤量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。
SortShuffleManager相较于HashShuffleManager来说,有了⼀定的改进。主要就在于,每个Task在进⾏shuffle操作时,虽然也会产⽣较多的临时磁盘⽂件,但是最后会将所有的临时⽂件合并(merge)成⼀个磁盘⽂件,因此每个Task就只有⼀个磁盘⽂件。在下⼀个stage的shuffle read task拉取⾃⼰的数据时,只要根据索引读取每个磁盘⽂件中的部分数据即可。
未经优化的HashShuffle:
上游的stage的task对相同的key执⾏hash算法,从⽽将相同的key都写⼊到⼀个磁盘⽂件中,⽽每⼀个磁盘⽂件都只属于下游stage的⼀个task。在将数据写⼊磁盘之前,会先将数据写⼊到内存缓冲,当内存缓冲填满之后,才会溢写到磁盘⽂件中。但是这种策略的不⾜在于,下游有⼏个task,上游的每⼀个task都就都需要创建⼏个临时⽂件,每个⽂件中只存储key取hash之后相同的数据,导致了当下游的task任务过多的时候,上游会堆积⼤量的⼩⽂件
优化后的HashShuffle:
在shuffle write过程中,上游stage的task就不是为下游stage的每个task创建⼀个磁盘⽂件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应⼀批磁盘⽂件,磁盘⽂件的数量与下游stage的task数量是相同的。⼀个Executor上有多少个CPU core,就可以并⾏执⾏多少个task。⽽第⼀批并⾏执⾏的每个task都会创建⼀个shuffleFileGroup,并将数据写⼊对应的磁盘⽂件内。当Executor的CPU core 执⾏完⼀批task,接着执⾏下⼀批task时,下⼀批task就会复⽤之前已有的shuffleFileGroup,包括其中的磁盘⽂件。也就是说,此时task会将数据写⼊已有的磁盘⽂件中,⽽不会写⼊新的磁盘⽂件中。因此,consolidate机制允许不同的task复⽤同⼀批磁盘⽂件,这样就可以有效将多个task的磁盘⽂件进⾏⼀定程度上的合并,从⽽⼤幅度减少磁盘⽂件的数量,进⽽提升shuffle write的性能。
注意:如果想使⽤优化之后的ShuffleManager,需要将:solidateFiles调整为true。(当然,默认是开启的)
未经优化:上游的task数量:m ,下游的task数量:n ,上游的executor数量:k (m>=k) ,总共的磁盘⽂件:m*n
优化之后的:上游的task数量:m ,下游的task数量:n ,上游的executor数量:k (m>=k) ,总共的磁盘⽂件: k*n
新生儿护理知识
普通的SortShuffle:
在普通模式下,数据会先写⼊⼀个内存数据结构中,此时根据不同的shuffle算⼦,可以选⽤不同的数据结构。如果是由聚合操作的shuffle算⼦,就是⽤map的数据结构(边聚合边写⼊内存),如果是join的算⼦,就使⽤array的数据结构(直接写⼊内存)。接着,每写⼀条数据进⼊内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘⽂件之前,会先根据key对内存数据结构中已有的数据进⾏排序,排序之后,会分批将数据写⼊磁盘⽂件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写⼊磁盘⽂件,写⼊磁盘⽂件是通过Java的BufferedOutputStream 实现的。BufferedOutputStream是Java的缓冲输出流,⾸先会将数据缓冲在内存中,当内存缓冲满溢之后再⼀次写⼊磁盘⽂件中,这样可以减少磁盘IO次数,提升性能。
此时task将所有数据写⼊内存数据结构的过程中,会发⽣多次磁盘溢写,会产⽣多个临时⽂件,最后会将之前所有的临时⽂件都进⾏合并,最后会合并成为⼀个⼤⽂件。最终只剩下两个⽂件,⼀个是合并之后的数据⽂件,⼀个是索引⽂件(标识了下游各个task的数据在⽂件中的start offt与end offt)。最终再由下游的task根据索引⽂件读取相应的数据⽂件。
SortShuffle - bypass运⾏机制:
此时上游stage的task会为每个下游stage的task都创建⼀个临时磁盘⽂件,并将数据按key进⾏hash然后根据key的hash值,将key写⼊对应的磁盘⽂件之中。当然,写⼊磁盘⽂件时也是先写⼊内存缓冲,缓冲写满之后再溢写到磁盘⽂件的。最后,同样会将所有临时磁盘⽂件都合并成⼀个磁盘⽂件,并创建⼀个单独的索引⽂件。
⾃⼰的理解:bypass的就是不排序,还是⽤hash去为key分磁盘⽂件,分完之后再合并,形成⼀个索引⽂件和⼀个合并后的key hash⽂件。省掉了排序的性能。
bypass机制与普通SortShuffleManager运⾏机制的不同在于:
a、磁盘写机制不同;
b、不会进⾏排序。
也就是说,启⽤该机制的最⼤好处在于,shuffle write过程中,不需要进⾏数据的排序操作,也就节省掉了这部分的性能开销。
触发bypass机制的条件:
shuffle map task的数量⼩于 spark.shuffle.sort.bypassMergeThreshold 参数的值(默认200)并且不是聚合类的shuffle算⼦(⽐如groupByKey)
7、简述SparkSQL中RDD、DataFrame、DataSet三者的区别与联系?(重点)
RDD:弹性分布式数据集;不可变、可分区、元素可以并⾏计算的集合。
优点:
RDD编译时类型安全:编译时能检查出类型错误;
⾯向对象的编程风格:直接通过类名点的⽅式操作数据。
utilization
缺点:
序列化和反序列化的性能开销很⼤,⼤量的⽹络传输;
构建对象占⽤了⼤量的heap堆内存,导致频繁的GC(程序进⾏GC时,所有任务都是暂停)
DataFrame
DataFrame以RDD为基础的分布式数据集。
优点:
DataFrame带有元数据schema,每⼀列都带有名称和类型。
DataFrame引⼊了off-heap,构建对象直接使⽤操作系统的内存,不会导致频繁GC。
数字翻译成英文
DataFrame可以从很多数据源构建;
DataFrame把内部元素看成Row对象,表⽰⼀⾏⾏的数据。
DataFrame=RDD+schema
缺点:
编译时类型不安全;
不具有⾯向对象编程的风格。
Datat
DataSet包含了DataFrame的功能,Spark2.0中两者统⼀,DataFrame表⽰为DataSet[Row],即DataSet的⼦集。
(1)DataSet可以在编译时检查类型;
(2)并且是⾯向对象的编程接⼝。
(DataSet 结合了 RDD 和 DataFrame 的优点,并带来的⼀个新的概念 Encoder。当序列化数据时,Encoder 产⽣字节码与 off-heap 进⾏交互,能够达到按需访问数据的效果,⽽不⽤反序列化整个对象。)。
三者之间的转换:
8、Repartition和Coalesce关系与区别(重点)
1)关系:
两者都是⽤来改变RDD的partition数量的,repartition底层调⽤的就是coalesce⽅法:coalesce(numPartitions, shuffle = true)
2)区别:
repartition⼀定会发⽣shuffle,coalesce根据传⼊的参数来判断是否发⽣shuffle
⼀般情况下增⼤rdd的partition数量使⽤repartition,减少partition数量时使⽤coalesce
9、Spark中cache默认缓存级别是什么?(重点)
DataFrame的cache默认采⽤ MEMORY_AND_DISK 这和RDD 的默认⽅式不⼀样RDD cache 默认采⽤MEMORY_ONLY