spark源码阅读-spark-submit任务提交流程(local模式)

更新时间:2023-07-22 20:38:11 阅读: 评论:0

spark源码阅读-spark-submit任务提交流程(local模式)从spark启动任务源头 $SPARK_HOME/bin/spark-submit 开始阅读spark源码。
⼀、脚本阶段
提交任务命令,先使⽤local模式
spark-submit --master local --class com.lof.main.SparkPi /Urs/ur/Desktop/SparkPi.jar
sparkPi代码:
public class SparkPi {
public static void main(String[] args){
SparkSession spark = SparkSession.builder().appName("JavaSparkPi").getOrCreate();
JavaSparkContext jsc =new JavaSparkContext(spark.sparkContext());
int slices =(args.length ==1)? Integer.parInt(args[0]):2;
List<Integer> l =new ArrayList<Integer>(100000* slices);
for(int i =0; i < n; i++){
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);torn
int count = dataSet.map(integer ->{
double x = Math.random()*2-1;
double y = Math.random()*2-1;
return(x * x + y * y <=1)?1:0;
}).reduce((integer, integer2)-> integer + integer2);
System.out.println("Pi is roughly "+4.0* count / n);
spark.stop();
}
}
spark的任务提交脚本 $SPARK_HOME/bin/spark-submit
#!/usr/bin/env bash
# 如环境变量中没有配置 SPARK_HOME ,⾃动配置
if[ -z "${SPARK_HOME}"];then
source"$(dirname"$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec"${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
调⽤bin⽬录下的 spark-class ,并将参数传过去,接下来看⼀看 spark-class
#!/usr/bin/env bash
## 只保留核⼼代码
# 执⾏load-spark-env.sh,最终调⽤spark-env.sh
."${SPARK_HOME}"/bin/load-spark-env.sh
# 查找jars⽬录
# 源码编译的⽬录下没有 ${SPARK_HOME}/jars 没有该⽬录
if[ -d "${SPARK_HOME}/jars"];then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
el
SPARK_JARS_DIR="${SPARK_HOME}/asmbly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
# 执⾏ lauancher.Main ⽅法,⽣成 commond 命令
如何自制美白面膜build_command(){
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf"%d\0"$?
}
# Turn off posix mode since it does not allow process substitution
t +o posixoverrun
CMD=()
while IFS=read -d '' -r ARG;do
CMD+=("$ARG")
done<<(build_command "$@")
CMD=("${CMD[@]:0:$LAST}")
echo"${CMD[@]}"
exec"${CMD[@]}"
将"{CMD[@]}"输出,最终 spark-class 执⾏的命令为
/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/bin/java
-cp/spark-2.3.2/conf/:/spark-2.3.2/asmbly/target/scala-2.11/jars/*:/hadoop-2.7.3/etc/hadoop/
-Xmx1g org.apache.spark.deploy.SparkSubmit
--master local --class org.amples.SparkPi /Urs/xxx/Desktop/SparkPi
长沙培训机构排名榜可以看出,调⽤的是org.apache.spark.deploy.SparkSubmit类执⾏提交操作
源码中找到 org.apache.spark.deploy.SparkSubmit
在idea中,将–master local --class org.amples.SparkPi /Urs/xxx/Desktop/SparkPi设置为启动参数,模拟执⾏⼆、进⼊代码
org.apache.spark.deploy.SparkSubmit中main⽅法 核⼼是:1、加载参数 2、提交任务
[外链图⽚转存失败,源站可能有防盗链机制,建议将图⽚保存下来直接上传(img-Kkg8iUfJ-1585278184804)
(/Urs/lofty/Library/Application Support/typora-ur-images/image-20200326172915847.png)]
1、加载环境参数
// 将提交参数解析及加载环境变量
val appArgs = new SparkSubmitArguments(args)
SparkSubmitArguments中核⼼⽅法
dovey[外链图⽚转存失败,源站可能有防盗链机制,建议将图⽚保存下来直接上传(img-bEK842eW-1585278184806)
(/Urs/lofty/Library/Application Support/typora-ur-images/image-20200326175820154.png)]
第⼀次加载参数 par()⽅法,将命令⾏参数加载,其中–conf 参数放在sparkProperties中
第⼆次加载参数 mergeDefaultSparkProperties()⽅法,加载f参数,放在defaultSparkProperties中 第三次充填参数: loadEnvironmentArguments()⽅法,使⽤sparkProperties 和 env(环境变量)填充缺少的参数
根据这3个⽅法可以看出,在提交过程中的参数的优先级:
命令⾏参数 > f > 环境变量
2、提交任务
提交任务⾸先执⾏SparkSubmit.scala 中的 submit()⽅法。
然后执⾏doMain()⽅法
在doMain()⽅法中调⽤runMain() ⽅法,runMain⽅法较长,只保留核⼼代码
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sparkConf: SparkConf,
childMainClass: String,
verbo: Boolean): Unit ={
// 获取ClassLoader
val loader =
(DRIVER_USER_CLASS_PATH_FIRST)){
new ChildFirstURLClassLoader(new Array[URL](0),
ContextClassLoader)
}el{
new MutableURLClassLoader(new Array[URL](0),
ContextClassLoader)
}
Thread.currentThread.tContextClassLoader(loader)
// TODO 将jar包路径添加进ClassLoader中,以便后续通过反射直接调⽤直接
for(jar <- childClasspath){
addJarToClasspath(jar, loader)
}
var mainClass: Class[_]= null
try{
// TODO 创建主类 class
mainClass = Utils.classForName(childMainClass)
}catch{
// 此处省略
}
// TODO 判断 SparkApplication是否其⽗类 (主类是否实现SparkApplication接⼝)
val app: SparkApplication =if(classOf[SparkApplication].isAssignableFrom(mainClass)){
母亲节的英文wInstance().asInstanceOf[SparkApplication]逶迤
}el{
英语音标发音方法
// 此处省略
}
// TODO 直接通过反射执⾏传⼊jar包的main⽅法
new JavaMainApplication(mainClass)
evp
}
try{
// 调⽤start⽅法启动
app.Array, sparkConf)
}catch{
// 此处省略
}
}
第⼀步:获取当前线程的 ClassLoader
第⼆步:将application的jar包路径添加进ClassLoader中,以便后续直接通过反射调⽤
第三步:创建application主类的class对象
托收行第四步:如果主类实现SparkApplication⽅法,则调⽤start()⽅法启动。否则通过JavaMainApplication的start()反射调⽤主类的main⽅法。
local模式就这么简单,加载参数、启动任务,全在当前JVM中执⾏
下⼀篇聊聊⽣产环境常⽤的 Yarn Cluster模式。

本文发布于:2023-07-22 20:38:11,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/90/185561.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:参数   提交   任务   加载   源码
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图