SPARK-Submit调参(转)
⽂章出处:
如有不妥之处,欢迎随时留⾔沟通交流,谢谢~
在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚⾄压根⼉不设置。资源参数设置的不合理,可能会导致没有充分利⽤集群资源,作业运⾏会极其缓慢;或者设置的资源过⼤,队列没有⾜够的资源来提供,进⽽导致各种异常。总之,⽆论是哪种情况,都会导致Spark作业的运⾏效率低下,甚⾄根本⽆法运⾏。因此我们必须对Spark作业的资源使⽤原理有⼀个清晰的认识,并知道在Spark作业运⾏过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。
Spark作业基本运⾏原理
详细原理见上图。我们使⽤spark-submit提交⼀个Spark作业之后,这个作业就会启动⼀个对应的Driver进程。根据你使⽤的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个⼯作节点上启动。Driver进程本⾝会根据我们设置的参数,占有⼀定数量的内存和CPU core。⽽Driver进程要做的第⼀件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团·⼤众点评使⽤的是YARN作为资源管理集群)申请运⾏Spark作业需要使⽤的资源,这⾥的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个⼯作节点上,启动⼀定数量的Executor进程,每个Executor进程都占有⼀定数量的内存和CPU core。
在申请到了作业执⾏所需的资源之后,Driver进程就会开始调度和执⾏我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执⾏⼀部分代码⽚段,并为每个stage创建⼀批task,然后将这些task分配到各个Executor进程中执⾏。task是最⼩的计算单元,负责执⾏⼀模⼀样的计算逻辑(也就是我们⾃⼰编写的某个代码⽚段),只是每个task处理的数据不同⽽已。⼀个stage的所有task都执⾏完毕之后,会在各个节点本地的磁盘⽂件中写⼊计算中间结果,然后Driver就会调度运⾏下⼀个stage。下⼀个stage的task的输⼊数据就是上⼀个stage输出的中间结果。如此循环往复,直到将我们⾃⼰编写的代码逻辑全部执⾏完,并且计算完所有的数据,得到我们想要的结果为⽌。
Spark是根据shuffle类算⼦来进⾏stage的划分。如果我们的代码中执⾏了某个shuffle类算⼦(⽐如reduceByKey、join等),那么就会在该算⼦处,划分出⼀个stage界限来。可以⼤致理解为,shuffle算⼦执⾏之前的代码会被划分为⼀个stage,shuffle算⼦执⾏以及之后的代码会被划分为下⼀个stage。因此⼀个stage刚开始执⾏的时候,它的每个task可能都会从上⼀个stage的task所在的节点,去通过⽹络传输拉取需要⾃⼰处理的所有key,然后对拉取到的所有相同的key使⽤我们⾃⼰编写的算⼦函数执⾏聚合操作(⽐如reduceByKey()算⼦接收的函数)。这个过程就是shuffle。
当我们在代码中执⾏了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘⽂件中。
因此Executor的内存主要分为三块:第⼀块是让task执⾏我们⾃⼰编写的代码时使⽤,默认是占Executor总内存的20%;第⼆块是让task通过shuffle过程拉取了上⼀个stage的task的输出后,进⾏聚合等操作时使⽤,默认也是占Executor总内存的20%;第三块是让RDD 持久化时使⽤,默认占Executor总内存的60%。
task的执⾏速度是跟每个Executor进程的CPU core数量有直接关系的。⼀个CPU core同⼀时间只能执⾏⼀个线程。⽽每个Executor 进程上分配到的多个task,都是以每个task⼀条线程的⽅式,多线程并发运⾏的。如果CPU core数量⽐较充⾜,⽽且分配到的task数量⽐较合理,那么通常来说,可以⽐较快速和⾼效地执⾏完这些task线程。
以上就是Spark作业的基本运⾏原理的说明,⼤家可以结合上图来理解。理解作业基本原理,是我们进⾏资源参数调优的基本前提。
资源参数调优
梦见自己拉稀 了解完了Spark作业运⾏的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运⾏过程中各个使⽤资源的地⽅,通过调节各种参数,来优化资源使⽤的效率,从⽽提升Spark作业的执⾏性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运⾏原理中的某个部分,我们同时也给出了⼀个调优的参考值。
内容
1.num-executors
参数说明:该参数⽤于设置Spark作业总共要⽤多少个Executor进程来执⾏。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个⼯作节点上,启动相应数量的Executor进程。这个参数⾮常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运⾏速度是⾮常慢的。
参数调优建议:每个Spark作业的运⾏⼀般设置50~100个左右的Executor进程⽐较合适,设置太少或
太多的Executor进程都不好。
设置的太少,⽆法充分利⽤集群资源;设置的太多的话,⼤部分队列可能⽆法给予充分的资源。
设置⼤⼩受限于集群中节点数量?集群5个节点,申请50个executors,结果只返回4个executors
18/07/20 18:21:13 INFO client.RMProxy: Connecting to ResourceManager at bigdata01
18/07/20 18:21:13 INFO yarn.YarnRMClient: Registering the ApplicationMaster
18/07/20 18:21:13 INFO yarn.YarnAllocator: Will request 50 executor container(s), each with 10 core(s) and 2432 MB memory (including 384 MB of overhea 18/07/20 18:21:13 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef
18/07/20 18:21:13 INFO yarn.YarnAllocator: Submitted 50 unlocalized container requests.
18/07/20 18:21:13 INFO yarn.ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
18/07/20 18:21:14 INFO yarn.YarnAllocator: Launching container container_1530757262602_0058_01_000002 on host bigdata04 for executor with ID 1 18/07/20 18:21:14 INFO yarn.YarnAllocator: Launching container container_1530757262602_0058_01_000003 on host bigdata03 for executor with ID 2 18/07/20 18:21:14 INFO yarn.YarnAllocator: Received 2 containers from YARN, launching executors on 2 of them.
18/07/20 18:21:14 INFO yarn.YarnAllocator: Launching container container_1530757262602_0058_01_000004 on host bigdata05 for executor with ID 3 18/07/20 18:21:14 INFO yarn.YarnAllocator: Launching container container_1530757262602_0058_01_000005 on host bigdata02 for executor with ID 4 18/07/20 18:21:14 INFO yarn.YarnAllocator: Received 2 containers from YARN, launching executors on 2 of them.
18/07/20 18:21:17 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) with ID 1 18/07/20 18:21:17 INFO storage.BlockManagerMasterEndpoint: Registering block manager bigdata04:43505 with 912.3 MB RAM, BlockManagerId(1, bigda 18/07/20 18:21:18 INFO cluster.YarnSchedulerBackend$YarnDriver
入盘Endpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) with ID 4 18/07/20 18:21:18 INFO storage.BlockManagerMasterEndpoint: Registering block manager bigdata02:46476 with 912.3 MB RAM, BlockManagerId(4, bigda 18/07/20 18:21:19 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) with ID 2 18/07/20 18:21:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager bigdata03:34136 with 912.3 MB RAM, BlockManagerId(2, bigda 18/07/20 18:21:21 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) with ID 3 18/07/20 18:21:21 INFO storage.BlockManagerMasterEndpoint: Registering block manager bigdata05:43899 with 912.3 MB RAM, BlockManagerId(3, bigda 18/07/20 18:21:43 INFO cluster.YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResources 18/07/20 18:21:43 INFO cluster.YarnClusterScheduler: YarnClusterScheduler.postStartHook done
四大事务所
参数说明:该参数⽤于设置每个Executor进程的内存。Executor内存的⼤⼩,很多时候直接决定了Spark作业的性能,⽽且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是⼀个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看⾃⼰团队的资源队列的最⼤内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最⼤内存量的。此外,如果你是跟团队⾥其他⼈共享这个资源队列,那么申请的内存量最好不要超过资源队列最⼤总内存的1/3~1/2,避免你⾃⼰的Spark作业占⽤了队列所有的资源,导致别的同学的作业⽆法运⾏。
参数说明:该参数⽤于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并⾏执⾏task线程的能⼒。因为每个CPU core同⼀时间只能执⾏⼀个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执⾏完分配给⾃⼰的所有task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看⾃⼰的资源队列的最⼤CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到⼏个CPU core。同样建议,如果是跟他⼈共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右⽐较合适,也是避免影响其他同学的作业运⾏。对联大全带横批
4.driver-memory
参数说明:该参数⽤于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯⼀需要注意的⼀点是,如果需要使⽤collect算⼦将
RDD的数据全部拉取到Driver上进⾏处理,那么必须确保Driver的内存⾜够⼤,否则会出现OOM内存溢出的问题。
5.spark.default.parallelism
参数说明:该参数⽤于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的⼀个错误就是不去设置这个参数,那么此时就会导致Spark⾃⼰根据底层HDFS的block数量来设置task的数量,默认是⼀个HDFS block对应⼀个task。通常来说,Spark默认设置的数量是偏少的(⽐如就⼏⼗个task),如果task数量偏少的话,就会导致你前⾯设置好的Executor的参数都前功尽弃。试想⼀下,⽆论你的Executor进程有多少个,内存和CPU有多⼤,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task 执⾏,也就是⽩⽩浪费了资源!因此Spark官⽹建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合
适,⽐如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利⽤Spark集群的资源。
6.Fraction
参数说明:该参数⽤于设置RDD持久化数据在Executor内存中能占的⽐例,默认是0.6。也就是说,默认Executor 60%的内存,可以⽤来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写⼊磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提⾼⼀些,保证持久化的数据能够容纳在内存中。
避免内存不够缓存所有的数据,导致数据只能写⼊磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作⽐较多,⽽持久化操作⽐较少,那么这个参数的值适当降低⼀些⽐较合适。此外,如果发现作业由于频繁的gc导致运⾏缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执⾏⽤户代码的内存不够⽤,那么同样建议调低这个参数的值。
云南财经大学录取分数线7.Fraction
参数说明:该参数⽤于设置shuffle过程中⼀个task拉取到上个stage的task的输出后,进⾏聚合操作时
能够使⽤的Executor内存的⽐例,默认是0.2。也就是说,Executor默认只有20%的内存⽤来进⾏该操作。shuffle操作在进⾏聚合时,如果发现使⽤的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘⽂件中去,此时就会极⼤地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占⽐,提⾼shuffle操作的内存占⽐⽐例,避免shuffle过程中数据过多时内存不够⽤,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运⾏缓慢,意味着task执⾏⽤户代码的内存不够⽤,那么同样建议调低这个参数的值。
参数说明:Total cores for all executors.
executor使⽤的总核数,仅限于Spark Alone、Spark on Mesos模式
9.资源参数参考⽰例
以下是⼀份spark-submit命令的⽰例:
spark2-submit \
李易峰星座
--packages org.apache.kudu:kudu-spark2_2.11:1.2.0 \
--master yarn \
--deploy-mode cluster \竹的寓意
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--total-executor-cores 400 \ ##standalone default all cores
读书的事例--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf Fraction=0.5 \
--conf Fraction=0.3 \
-
-conf "utorEnv.JAVA_HOME=/usr/java/jdk1.8.0_131" \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8.0_131"
10.Spark-Submit yarn/cluster提交区别