平台搭建---Spark提交应⽤程序---SparkSubmit提交应⽤程序及yarn
本部分,也可以到查看英⽂版。
spark-submit 是在spark安装⽬录中bin⽬录下的⼀个shell脚本⽂件,⽤于在集群中启动应⽤程序(如***.py脚本);对于spark⽀持的集群模式,spark-submit提交应⽤的时候有统⼀的接⼝,不⽤太多的设置。
使⽤spark-submit时,应⽤程序的jar包以及通过—jars选项包含的任意jar⽂件都会被⾃动传到集群中。
spark-submit --class --master --jars
1、绑定应⽤程序依赖
如果代码依赖于其它项⽬,为了将代码分发到Spark集群,就需要将这些依赖⼀起打包到应⽤程序中去。sbt和Maven都有装配插件,只要在创建集成的jar时列出Spark和Hadoop需要的依赖,⽽不需要将这些依赖和应⽤打包,因为在程序运⾏的时候集群的master知道如何调⽤和提供这些依赖;但是⼀旦有集成好的jar包,在执⾏bin/spark-submit脚本时就坐传递这些jar包了。
对于Python语⾔来讲,可以使⽤spark-submit的–py-files参数添加.py,.zip,.egg⽂件和应⽤程序⼀起进⾏分发,如果应⽤程序依赖于多个Python⽂件,建议将它们打包成.zip或.egg⽂件。
2、⽤spark-submit启动应⽤程序
./bin/spark-submit \
--class<main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
...# other options
<application-jar> \
[application-arguments]
⽤得较多的参数是:
--class:应⽤程序的⼊⼝点(例如,org.amples.SparkPi)
-
-master:集群的master URL(例如,spark://localhost:7077)
--deploy-mode:将driver部署到worker节点(cluster模式)或者作为外部客户端部署到本地(client模式),默认情况下是client模式
--conf:⽤key=value格式强制指定Spark配置属性,⽤引号括起来
--application-jar:包含应⽤程序和所有依赖的jar包的路径,路径必须是在集群中是全局可见的,例如,hdfs://路径或者file://路径
--application-arguments:传递给主类中main函数的参数
⼀般的部署策略是在⼀个⽹关机器上提交应⽤程序,这个机器和Worker机器部署在⼀个⽹络中(例如,Standalone模式的EC2集群中的Master节点)。在此部署策略中,client模式更为合适,client模式中的driver直接跟spark-submit进程⼀起启动,spark-submit进程在此扮演集群中⼀个client的⾓⾊。应⽤程序的输⼊输出依赖于控制台,如此⼀来,这种模式就特别适合关于REPL(例如,Spark shell)的应⽤程序。
另⼀种部署策略是,应⽤程序通过⼀台远离Worker节点的机器提交(例如,本地或者便携设备),这种情况下,⼀般使⽤cluster模式最⼩化drivers和executors之间的⽹络延时。注意,cluster模式暂时不
⽀持于Mesos集群或Python应⽤程序。
Python应⽤程序中,简单地在application-jar处传递⼀个.py⽂件⽽不是JAR⽂件,然后⽤–py-files添加Python.zip,.egg或者.py⽂件到搜索路径。
还有⼀些集群管理器正在使⽤的可选项。例如,对于Spark Standalone的cluster部署模式,也可以使⽤–supervi以确定driver在遇到⾮0(non-zero)退出码的错误时进⾏⾃动重启。
通过运⾏spark-submit上–help列出所有的可选项。以下是⼀些常⽤选项的例⼦:
# Run application locally on 8 cores
./bin/spark-submit \
--class org.amples.SparkPi \
--master local[8] \
一劳永逸的意思/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.amples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervi
./bin/spark-submit \
--class org.amples.SparkPi \
-
-master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervi \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.amples.SparkPi \
-
-master yarn-cluster \ # can also be`yarn-client` for client mode
--executor -memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervi
.
/bin/spark-submit \
好之
--class org.amples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervi \
--executor-memory 20G \
--total-executor-cores 100 \
芝士虾球的做法path/to/examples.jar \
1000
更多参数设置可参考:
对于⾃定义的py模块,导⼊的时候由于主程序代码是分发到各个⼦节点去执⾏的,⼦节点在导⼊这些模块时会报错:
module XX not found之前的错误
⼀种⽅法是将这些模块放在各⼦节点的python路径上,⼀种是将这些模块也分发到各个节⼦节点。⽅式如下:
spark-submit --master yarn --num-executors 3--executor-cores 10--executor-memory 30g --py-files /XXX/AAAA.py,/XXX/BBBB.py CCCmain.py 或者使⽤下⾯的⽅式
3、Master URLs
传递给Spark的master url可以是以下任意格式之⼀:
master URL意义
local使⽤1个worker线程本地运⾏Spark(即完全没有并⾏化)
local[K]使⽤K个worker线程本地运⾏Spark(最好将K设置为机器的CPU核数)
local[*]根据机器的CPU逻辑核数,尽可能多地使⽤Worker线程
spark://HOST:PORT连接到给定的Spark Standalone集群的Master,此端⼝必须是Master配置的端⼝,
默认为7077
mesos://HOST:PORT 连接到给定的Mesos集群的Master,此端⼝必须是Master配置的端⼝,默认为5050。若Mesos集群使⽤ZooKeeper,则
master URL使⽤mesos://zk://……
yarn-client以client模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
剑姬怎么玩
yarn-cluster以cluster模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
4、从⽂件中加载配置
spark-submit脚本可以通过属性⽂件加载默认的Spark配置值并将其传递给应⽤程序。默认情况下会读取Spark⽬录中f⽂件中的各配置项,详细信息参考“加载默认配置”。
加载默认配置可以取消spark-submit命令的某些参数选项。例如,如果设置了spark.master属性,那么可以直接省略 --master选项。⼀般情况下,直接使⽤SparkConf设置的属性值具有最⾼的优先级,然后是spark-submit命令中传递的选项,最后才是默认配置⽂件中的值。如果你不清楚配置选项是来⾃于哪⾥,可以运⾏spark-submit --verbo打印处更细粒度的调试信息。
5、⾼级依赖管理
牛杂汤
当使⽤spark-submit时,应⽤程序的jar包以及由**–jars选向给出的jar会⾃动上传到集群,由–jars给出的jar路径之间必做⽤逗号分隔,如果路径是个⽬录的话,–jars**的设置⽆法起作⽤,必须详细到abc.jar。
Spark使⽤了下⾯的URL格式允许不同的jar包分发策略。
1、⽂件file⽅式:
绝对路径且file:/URIs是作为driver的HTTP⽂件服务器,且每个executor会从driver的HTTP服务器拉取⽂件;
2、hdfs⽅式:
http:,https:,ftp:,从这些给定的URI中拉取⽂件和JAR包;
3、本地local⽅式:
以local:/开始的URI应该是每个worker节点的本地⽂件,这意味着没有⽹络IO开销,并且推送或通过NFS/GlusterFS等共享到每个worker ⼤⽂件/JAR⽂件或能很好的⼯作。
注意:SparkContext的JAR包和⽂件都会被复制到每个executor节点的⼯作⽬录下,这将⽤掉⼤量的空间,所以需要清理⼲净。
在YARN下,会⾃动清理。
在Spark Standalone下,可以通过配置spark.worker.cleanup.appDataTtl属性做到⾃动清理。
⽤户可以⽤--packages选项提供⼀个以逗号分隔的maven清单来包含任意其他依赖。
其它的库(或SBT中的resolvers)可以⽤--repositories选项添加(同样⽤逗号分隔),这些命令都可以⽤在pyspark,spark-shell和spark-submit中来包含⼀些Sp ark包。
对Python⽽⾔,–py-files选项可以⽤来向executors分发.egg,.zip和.py库。
6、在YARN集群上运⾏Spark应⽤
详细请参考
在Spark独⽴集群模式,–master 所提供的地址是由Spark⾃⾝给出,在yarn模式下,Spark资源管理器的地址其实也就是hadoop的master地址,因⽽–master 的地址应该由yarn来提供。写一篇周记
格式如下:
$ ./bin/spark-submit --ur.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
下⾯是⼀个更详细的例⼦:
$ ./bin/spark-submit --class org.amples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
执⾏的过程⼤致如下:
上⾯的应⽤提交后,会启动⼀个yarn客户端程序,这个程序接着启动master上的应⽤程序,SparkPi 再启动master应⽤的不同线
程,yarn客户端程序会周期性地检查master应⽤的状态,并在控制台打印这些状态,程序结束后,yarn客户端会结束即出。我们可以通过查看驱动器和执⾏器的⽇志⽂件来调试应⽤程序。
先放在这⾥,有时间再来补上
7、运⾏ Spark Python 应⽤
主要是要解决如何将依赖库⼀起提交的问题。
**⽅法⼀:**将库以python⽂件的⽅式提交。
⾸先要理解的,我们在安装python外部库的时候,该库的功能主要还是靠abc.py这样的类⽂件来实现,当我们⽤类似from
reply过去式spark_learning.utils.default_utils import tDefaultEncoding的时候,是从这个类⽂件中引⼊了某个函数。所以当我们要将某个库提交给集群的时候,可以找到相关的类⽂件,然后将这个类⽂件提交上去。
可参考下⾯的格式(可能有错):清江河
#python代码中的import
from spark_learning.utils.default_utils import tDefaultEncoding,initSparkContext,ensureOfft
#submit命令:
bin/spark-submit --jars /home/jabo/software/spark-1.5.2-bin-hadoop2.6/lib/spark-streaming-kafka-asmbly_2.10-1.5.2.jar \
--py-files /home/jabo/spark-by-python/spark_learning/utils/default_utils.py \
/home/jabo/spark-by-python/spark_learning/third_day/streaming_kafka_avg.py
下⾯是⽐较正规的做法:
⽤ Java 和 Scala 访问 Spark 提供了许多优点 : 因为 Spark 它⾃⼰运⾏在 JVM 中,运⾏在 JVM 内部是平台⽆关的,独⽴的代码包和它打⼊到 JAR ⽂件中的依赖,以及更⾼的性能。如果您使⽤ Spark Python API 将会失去这些优势。
管理依赖并让它们可⽤于群集上的 Python Job 是很难的。为了确定哪些依赖在群集上是需要的,您必须了解运⾏在分布式 群集 中的Spark executor 进程中的 Spark 应⽤程序的代码。如果您定义的 Python 转换使⽤的任何的第三⽅库,⽐如 NumPy 或者 nltk,当它们运⾏在远程的 executor 上时 Spark executor 需要访问这些库。
7.1、独⽴的依赖关系
常见的情况中,⼀个⾃定义的 Python 包包含了你想要应⽤到每个 RDD 元素的 功能。下⾯展⽰了⼀个简单的例⼦ :
def import_my_special_package(x):
import my.special.package
return x
int_rdd = sc.parallelize([1,2,3,4])
int_rdd.map(lambda x: import_my_special_package(x))
llect()
您创建了⼀个有 4 个元素名为 int_rdd 的简单的 RDD。然后您应⽤函数 import_my_special_package 到每个 int_rdd 的元素。这个函数导⼊了 my.pcial.package 并且返回了传⼊的原始参数。这样和使⽤类或者定义的函数有相同的作⽤,因为 Spark 需要每个 Spark executor 在需要时导⼊ my.special.package。
如果你只需要 my.special.package 内部⼀个简单的⽂件,您可以通过在您的 spark-submit 命令中使⽤ --py-files 选项并指定⽂件的路径来直接让 Spark 的所有 executor 都可以获取。您也可以以编程的⽅式通过使⽤ sc.addPyFiles() 函数来指定。如果您使⽤的功能来⾃跨越多个⽂件的 package,为 package 制作⼀个 egg,因为 --py-files 标记也接受⼀个 egg ⽂件的路径。
如果您有⼀个独⽴的依赖关系,您可以使⽤两种⽅式让需要的 Python 依赖是可⽤的。
如果您只依赖⼀个单独的⽂件,您可以使⽤ --py-files 命令⾏选项,或者以编程的⽅式⽤ sc.addPyFiles(path) 兵指定本地 Python ⽂件的路径添加这些⽂件到 SparkContext。
如果您有⼀个独⽴模块上的依赖(⼀个没有其它依赖的模块),您可以创建⼀个这些模块的 egg 或者
zip ⽂件,使⽤ --py-files 命令⾏选项或者以编程的⽅式⽤ sc.addPyFiles(path) 兵指定本地 Python ⽂件的路径添加这些⽂件到 SparkContext。
7.2、复杂的依赖关系
⼀些操作依赖有许多依赖关系的复杂的 package。例如,下列的代码⽚段导⼊了 Python pandas 数据分析库 :
def import_pandas(x):
import pandas
return x
int_rdd = sc.parallelize([1,2,3,4])
int_rdd.map(lambda x: import_pandas(x))
llect()
pandas 依赖 NumPy,SciPi,和许多其它的 package。尽管 pandas 作为⼀个 *.py ⽂件来分发是很
复杂的,您可以为它和它的依赖建⼀个 egg 并发送它们到 executor。
7.3、分发 Egg ⽂件的限制
在这两个独⽴的,复杂的依赖关系的情况中,发送 egg ⽂件是有问题的,因为带有原代码包必须在其上运⾏的特定主机进⾏编译。当⾏业业标准的硬件做分布式计算,你必须假设的是,硬件是多样化的。然⽽,由于所需的 C 编译,内置客户端主机上⼀个 Python egg 是特定的客户端的 CPU 架构。因此,分配复杂的 egg,像编译 NumPy,SciPy 的 package 和 pandas 经常出现故障。相反,分发 egg ⽂件,你应该在群集的每个主机上安装所需的 Python 包,并指定 Python 的⼆进制⽂件的路径为 worker 主机使⽤。
安装以及保持 Python 环境
安装和维护的Python环境可能⽐较复杂,但可以让你使⽤完整的Python包⽣态系统。系统管理员在⽤您需要的依赖在群集的每台主机上安装 Anaconda distribution 或者设置⼀个 虚拟环境。
如果您使⽤ Cloudera Manager,您可以使⽤⽅法将 Anaconda distribution 作为⼀个 Parcel 来部署。
最低需要的⾓⾊ : 群集管理员(或者管理员)
添加下⾯的 inuum.io/pkgs/misc/parcels/ 到远程的 Parcel 仓库 URL,像 Parcel 配置设置 中描述的⼀样。
像 管理 Parcels 中描述的⼀样下载,分发,并且激活 Parcel。
Anaconda 被安装在 parcel ⽬录/Anaconda 中,parcel ⽬录默认是 /opt/cloudera/parcels,但是可以在 parcel 配置设置中更改。Anaconda parcel ⽀持 Continuum Analytics。
如果您没有使⽤ Cloudera Manager,您可以在您的群集中安装⼀个虚拟环境通过在每台主机上运⾏命令 Cluster SSH,Parallel SSH,或者 Fabric。假设每台主机已经安装了 Python 和 pip,在⼀个 RHEL 6 兼容的系统中上的虚拟环境中使⽤下⾯的命令来安装标准的数据栈(NumPy,SciPy,scikit-learn,和 pandas)。