AirflowScheduler源码解读⽂章⽬录
1.Scheduler的启动和停⽌命令
1.1 Scheduler启动命令
对于Airflow的Scheduler我们⼀般会使⽤如下命令启动:
airflow scheduler \
--pid /data/bdetl/airflow/pids/airflow-scheduler.pid \
--stdout /data/bdetl/logs/airflow/airflow-scheduler.out \
--stderr /data/bdetl/logs/airflow/airflow-scheduler.out \
-l /data/bdetl/logs/airflow/airflow-scheduler.log \
-D
更多参数的可以参考:
参数⽰意
-sd, --subdir 从指定的路径中查找dags⽂件。默认为’[AIRFLOW_HOME]/dags’,其中[AIRFLOW_HOME]是我们在’airflow.cfg’中
为’AIRFLOW_HOME’设置的值。
-r, --run-
duration
设置退出前Scheduler程序的循环执⾏的时间(单位:秒)。
乡导-n, --
num_runs
设置退出Scheduler程序前,所有的dag⽂件被解析执⾏的次数。-p, --do_pickle是否将DAG对象以序列化的⽅式发送给worker节点执⾏。1.2 Scheduler停⽌命令
cat /data/bdetl/airflow/pids/airflow-scheduler.pid | xargs kill -15
执⾏如上命令后,会杀死scheduler进程,并清除airflow-scheduler.pid⽂件。
2.Scheduler程序源码
如下⽂章中:
ti表⽰task_instance,即任务实例;
tis表⽰task_instances;
代码是基于airflow1.10.11版本。
2.1 cli.scheduler(): 接受命令⾏中的airflow scheduler命令
根据指定的参数,⽣成⼀个SchedulerJob,再执⾏job的run⽅法。
@cli_utils.action_logging
def scheduler(args):
py2_deprecation_waring()
print(ttings.HEADER)
# ⽣成⼀个SchedulerJob
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)
# daemon模式
if args.daemon:
# 设置pid以及⽇志路径
pid, stdout, stderr, log_file = tup_locations("scheduler",
args.pid,
args.stdout,
args.stderr,
args.log_file)
handle = tup_logging(log_file)
stdout =open(stdout,'w+')
stderr =open(stderr,'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid,-1),
files_prerve=[handle],
stdout=stdout,
stderr=stderr,
)
# 执⾏schedulerJob的run⽅法
with ctx:
job.run()
stdout.clo()
stderr.clo()
el:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()
2.2 BaJob.run(): 向job表中新增SchdulerJob记录并调⽤⼦类的处理逻辑执⾏上述的job.run()⽅法之后,会执⾏SchdulerJob⽗类的BaJob的run⽅法:
def run(lf):
Stats.incr(lf.__class__.__name__.lower()+'_start',1,1)
# Adding an entry in the DB
with create_ssion()as ssion:
lf.state = State.RUNNING
# 往db中添加⼀条running的schdulerJob记录
ssion.add(lf)
id_ = lf.id
make_transient(lf)
lf.id= id_
try:电信怎么查套餐
# 执⾏⼦类的实现的_execute()⽅法
lf._execute()
# In ca of max runs or max duration
lf.state = State.SUCCESS
except SystemExit:
# In ca of ^C or SIGTERM
lf.state = State.SUCCESS
except Exception:
lf.state = State.FAILED
rai
finally:
# job执⾏完之后,填充end_date并更新记录
<(lf)
Stats.incr(lf.__class__.__name__.lower()+'_end',1,1)
如代码所⽰,该⽅法主要会在job表中新建⼀条scheduler job的记录:
如果_execute()⽅法(包含⼀个while循环)正常执⾏结束,则SchedulerJob的state为SUCCESS;
如果执⾏_execute()过程中,⼿动结束程序(ctrl-c or kill -15 pid),则SchedulerJob的state为SUCCESS;
如果执⾏_execute()过程中抛出异常,则SchedulerJob的state为FAILED;
最后添加SchedulerJob的end_date,并更新db中的记录。
臣功再欣2.3 SchdulerJob._execute(): SchdulerJob的具体执⾏逻辑
执⾏上述lf._execute()会跳转到⼦类的如下⽅法:
def_execute(lf):
lf.log.info("Starting the scheduler")
# DAGs can be pickled for easier remote execution by some executors
pickle_dags =Fal
if lf.do_pickle utor.__class__ not in \
(executors.LocalExecutor, executors.SequentialExecutor):
pickle_dags =True
lf.log.info("Running execute loop for %s conds", lf.run_duration)
lf.log.info("Processing each file at most %s times", lf.num_runs)
# Build up a list of Python files that could contain DAGs
lf.log.info("Searching for files in %s", lf.subdir)
# 根据指定的lf.subdir路径,查找dag⽂件
known_file_paths = list_py_file_paths(lf.subdir)
lf.log.info("There are %s files in %s",len(known_file_paths), lf.subdir)
回首白云低# When using sqlite, we do not u async_mode
雅思g类
# so the scheduler job and DAG parr don't access the DB at the same time.
async_mode =not lf.using_sqlite
# AIRFLOW SETTINGS:处理dag⽂件的时候,DagFileProcessor的超时时间,超时则kill掉处理进程
processor_timeout_conds = int('core','dag_file_processor_timeout')
processor_timeout = timedelta(conds=processor_timeout_conds)
"""
新建⼀个file processor agent:
dag_directory:默认的dag⽂件路径或⽤户指定的dags⽂件路径lf.subdir
file_paths:dags⽂件夹下的dag⽂件路径list
max_runs:scheduler解析dag⽂件的次数,默认为-1,表⽰⼀直解析
processor_factory:⽤于创建DagFileProcessor进程(AbstractDagFileProcessor⼦类)
processor_timeout:DagFileProcessor进程超时时间
async_mode:是否使⽤异步模式启动DagFileProcessorManager,如果db不是sqlite,则默认使⽤异步模式 """
lf.processor_agent = DagFileProcessorAgent(lf.subdir,
known_file_paths,
lf.num_runs,
type(lf)._create_dag_file_processor,
processor_timeout,
志南和尚绝句lf.dag_ids,
pickle_dags,
async_mode)
辩论赛观后感try:
lf._execute_helper()冬季美甲图片
except Exception:
ption("Exception when executing execute_helper")
finally:
lf.d()
lf.log.info("Exited execute loop")
_execute()是Schduler的主⽅法,执⾏调度系统的主逻辑,主要包含⼀下⼏部分:
2.3.1 list_py_file_paths(lf.subdir): 找到指定路径下的dag⽂件
# Build up a list of Python files that could contain DAGs
lf.log.info("Searching for files in %s", lf.subdir)
# 根据指定的lf.subdir路径,查找dag⽂件
known_file_paths = list_py_file_paths(lf.subdir)
lf.log.info("There are %s files in %s",len(known_file_paths), lf.subdir)