FlinkJAR包上传和运行逻辑

更新时间:2023-06-30 00:34:48 阅读: 评论:0

FlinkJAR包上传和运⾏逻辑
⽂章⽬录
说明
1. ⽬标:⾛读Flink Clint中Upload jar、Run jar相关代码
2. 源码版本:1.6.1
3. 部属模式:Standalone
4. 相关知识点:Netty、 CompletedFuture
启动RetServer
RestServerEndpoint.start
注册Handler
代码From DispatcherRestEndpoint.java
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>initializeHandlers(CompletableFuture<String> restAddressFuture){  List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =super.initializeHandlers(restAddressFuture);
...
JobSubmitHandler jobSubmitHandler =new JobSubmitHandler(
restAddressFuture,
leaderRetriever,
timeout,
responHeaders,
executor,
clusterConfiguration);
Boolean(WebOptions.SUBMIT_ENABLE)){
try{
// 此处注册了JAR Upload和Run的处理⽅法
webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
leaderRetriever,
restAddressFuture,
timeout,
responHeaders,
uploadDir,
executor,
clusterConfiguration);
/
/ register extension handlers
handlers.Handlers());
}catch(FlinkException e){
...
}
}el{
log.info("Web-bad job submission is not enabled.");
}
...
狂人日记摘抄
return handlers;
}
在WebSubmissionExtension中,可以看到定义了Upload、Run、List、Delete、Plan的Handler
Upload JAR
处理代码在JarUploadHandler的handleRequest⽅法中。
Jar包存放路径:
⽅法本⾝逻辑简单,⽐较隐蔽的是jarDir的值。通过倒推寻找该值的赋值过程。
1. JarUploadHandler 构造时赋值属性jarDir;
2. JarUploadHandler由WebSubmissionExtension通过WebMonitorUtils.loadWebSubmissionExtension构造,jarDir源⾃⽗类
RestServerEndpoint中的变量uploadDir;
3. RestServerEndpoint中uploadDir通过UploadDir()初始化
4. 在RestServerEndpointConfiguration中找到了源头:
final Path uploadDir = (
职业生涯规划书模板
"flink-web-upload");
⼀般情况下,⼤家都不会改写配置项WebOption.UPLOAD_DIR(对应配置项“web.upload.dir”),所以JAR包存放到了"$WebOptions.TMP_DIR/flink-web-upload"
WebOptions.TMP_DIR的赋值⽐较隐蔽,只从配置⽂件看,是在/tmp⽬录。但是在ClusterEntrypoint的generateClusterConfiguration中,其实对该值进⾏了改写:
final String webTmpDir = String(WebOptions.TMP_DIR);
final File uniqueWebTmpDir =new File(webTmpDir,"flink-web-"+ UUID.randomUUID());
resultConfiguration.tString(WebOptions.TMP_DIR, AbsolutePath());
最终的效果JAR包存放⽬录是"/tmp/flink-web-UUID/flink-web-upload"
存放在tmp⽬录⾥⾯是有风险的,过期后会被删除。
Run Jar
同上,重点关注JarRunHandler的handleRequest
@Override
protected CompletableFuture<JarRunResponBody>handleRequest(
唯一的反义词@Nonnull final HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request,
@Nonnull final DispatcherGateway gateway)throws RestHandlerException {
...
# 产⽣JobGraph
final CompletableFuture<JobGraph> jobGraphFuture =getJobGraphAsync(
jarFile,
entryClass,
programArgs,
savepointRestoreSettings,
parallelism);
CompletableFuture<Integer> blobServerPortFuture = BlobServerPort(timeout);
# Jar上传JobGraph,UrJar和UrArtifact
CompletableFuture<JobGraph> jarUploadFuture = jobGraphFuture.thenCombine(blobServerPortFuture,(jobGraph, blobServerPort)->{ final InetSocketAddress address =new Hostname(), blobServerPort);
起步步骤try{
}catch(FlinkException e){
throw new CompletionException(e);
}
名家散文短篇
return jobGraph;
});
CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompo(jobGraph ->{
// we have to enable queued scheduling becau slots will be allocated lazily
jobGraph.tAllowQueuedScheduling(true);
# 提交Job
幸福宣言
return gateway.submitJob(jobGraph, timeout);
});
return jobSubmissionFuture
.thenCombine(jarUploadFuture,(ack, jobGraph)->new JobID()))
.exceptionally(throwable ->{
throw new CompletionException(new RestHandlerException(
HttpResponStatus.INTERNAL_SERVER_ERROR,
throwable));
});
}
⽣成JobGraph的过程
/* 在JarRunHandler的getJobGraphAsync中构造了PackagedProgram */
final PackagedProgram packagedProgram =new PackagedProgram(
entryClass,
jobGraph = ateJobGraph(packagedProgram, configuration, parallelism);
/* From PackagedProgramUtils.java */
public static JobGraph createJobGraph(
苹果手机优点PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism)throws ProgramInvocationException {
....
if(packagedProgram.isUsingProgramEntryPoint()){
...
}el if(packagedProgram.isUsingInteractiveMode()){
/* ⼀般提交的流程序会⾛这个分⽀,判断原则是⽤户程序的main Class是否isAssignableFrom ProgramDescription */ final OptimizerPlanEnvironment optimizerPlanEnvironment =new OptimizerPlanEnvironment(optimizer);
optimizerPlanEnvironment.tParallelism(defaultParallelism);
// 会触发main函数调⽤
flinkPlan = OptimizedPlan(packagedProgram);
}el{
throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
}
if(flinkPlan instanceof StreamingPlan){
/
/ 获取JobGraph
jobGraph =((StreamingPlan) flinkPlan).getJobGraph();
jobGraph.SavepointSettings());
}el{
...
鸭子英语怎么写}
...
return jobGraph;
}
调⽤⽤户程序main⽅法
/* From OptimizerPlanEnvironment.java */
public FlinkPlan getOptimizedPlan(PackagedProgram prog)throws ProgramInvocationException {
...
/* 设置ContextEnviormentFacoty对应的env为OptimizerPlanEnvironment */
tAsContext();
try{
/* 调⽤⽤户程序main⽅法 */
prog.invokeInteractiveModeForExecution();
}
...
}
执⾏⽤户程序main⽅法
/
/ ⼀个常见的main 结构
public static void main(String[] args)throws Exception {
/* 此处获取的是上⼀步tAsContext中设置的OptimizerPlanEnvironment */
StreamExecutionEnvironment env = ExecutionEnvironment();
...
/* 对应的是执⾏OptimizerPlanEnvironment的execute */
}
执⾏execute (和接触过⼀个概念很类似-打桩测试)
public JobExecutionResult execute(String jobName)throws Exception {
/* 反馈Compile后的FlinkPlan */
Plan plan =createProgramPlan(jobName);
this.optimizerPlan = pile(plan);
// execute后不要带其他的⽤户程序
// do not go on with anything now!
throw new ProgramAbortException();
}
提交JobGraph
OK,已经得到了JobGraph,再细看提交JobGraph的过程
/* From Dispatcher.java */
public CompletableFuture<Acknowledge>submitJob(JobGraph jobGraph, Time timeout){
...
if(jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE || ainsKey(jobId)){
pletedExceptionally(
new JobSubmissionException(jobId, String.format("Job has already been submitted and is in state %s.", jobSchedulingStatus)));
}el{
//重点关注persistAndRunJob
final CompletableFuture<Acknowledge> persistAndRunFuture =waitForTerminatingJobManager(jobId, jobGraph,this::persistAndRunJob) .thenApply(ignored -> ());
ptionally(
(Throwable throwable)->{
final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
<("Failed to submit job {}.", jobId, strippedThrowable);
throw new CompletionException(
new JobSubmissionException(jobId,"Failed to submit job.", strippedThrowable));
});
}
}
省略⼀些⽅法间调⽤,调⽤顺序如下:
1. Dispatch.persistAndRunJob
2. Dispatch.runJob
3. ateJobManagerRunner,创建JobMaster
4. ateAndRestoreExecutionGraph
终于看到了ExecutionGraph
ExectionGraph Deploy的过程
⽅法间调⽤关系:

本文发布于:2023-06-30 00:34:48,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1069260.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:赋值   逻辑   后会   法间   存放   改写   配置   相关
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图