Apache Airflow 是一个提供基于 DAG 来编排工作流的、可视化的分布式任务调度平台。Airflow 提供了通过 Python 代码实现可编程方式定义 DAG 工作流。当工作流通过代码来定义时,它们变得更加可维护、可版本化、可测试和协作。

An example Airflow DAG, rendered in Graph
An example Airflow DAG, rendered in Graph

在数据处理领域,各个任务之间的依赖关系往往比较复杂,无法通过 crontab 等基础工具进行调度,因此引入了 Airflow 这种调度平台来编排和管理:

  • 时间依赖:任务需要等待某一个时间点触发
  • 外部系统依赖:任务依赖外部系统需要调用接口去访问
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响
  • 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行

Architecture

Airflow的架构图如下:

  • Metadata Database:Airflow的元数据库,用于Webserver、Executor及Scheduler存储各种状态数据,通常是MySQL或PostgreSQL
  • User Interface:用户界面,即前端web界面,提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以在界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。
  • Webserver:web服务器,用于提供用户界面的操作接口
  • Scheduler:调度器,负责处理触发调度的工作流,并将工作流中的任务提交给执行器处理
  • Executor:执行器,负责处理任务实例。在本地模式下会运行在调度器中,并负责所有任务实例的处理。但是大多数适合于生产的执行器实际上是一个消息队列(RabbitMQ、Redis),负责将任务实例推送给工作节点执行
  • Workers:工作节点,真正负责调起任务进程、执行任务的节点,worker可以有多个,是独立的进程
  • DAG Directory:存放DAG任务图定义的Python代码的目录,代表一个Airflow的处理流程。代码文件所在的位置通过Airflow配置 dags_folder 指定,需要保证执行器、调度器以及工作节点都能够访问到

Concept

DAG

Airflow 的 DAG 工作流编排 Python 代码本质上是一个描述 DAG 结构的配置文件,真正的 Task 会在 python script context 之外的 context 中执行。

在具体的编写中,我们需要创建一个 DAG 对象来实现对于工作流的定义。

可以通过参数 default_args 给 DAG 传递默认参数,这是一个 dict 类型的数据,所有的参数都会被传递给 DAG 中的每一个 Operator。

1
2
args = {  
    'owner': 'houmin',    'email': ['houmin@cosmos.org'],    'email_on_failure': True,    'email_on_retry': False,    'start_date': datetime(2022, month=6, day=7, hour=0, minute=0),    'depends_on_past': True,}  

对 DAG 而言,dag_id 作为是其唯一的标志符,我们通过 schedule_iteraval 参数可以定义其执行周期,这是一个 crontab 格式的字符串。

1
2
dag = DAG(  
    dag_id='sync_data',    schedule_interval='@daily',    default_args=args,    catchup=False,    tags=['data', 'manager'],    max_active_runs=1,)  

一个 DAG 的运行实例被称作 DAG Run

Operators

Operator 定义了在 Airflow 中需要完成的任务单元。所有的 Operator 都继承自 BaseOperator,它包含了运行任务所需要的参数。除了 BaseOperator 所包含的参数外,每个 Operator 都有自己的参数,一些实用的比较多的 Operator 包括 BashOperatorPythonOperatorKubernetesPodOperator

BashOperator1

1
2
run_this = BashOperator(  
    task_id='run_after_loop',    bash_command='echo 1',)  

PythonOperator

KubernetesPodOperator

Tasks

Task 是 Operator 的一个运行实例,它决定了如何在 DAG 的 context 中执行你的 Operator 的任务。在 Airflow 中工作流上每个 task 都是原子可重试的,一个工作流某个环节的 task 失败可自动或手动进行重试,不必从头开始跑。

在下面的例子中,我们将 BashOperator 实例化为两个独立的任务。对每个 Task 而言,它的第一个参数 task_id 作为其唯一的标志符。可以看到 t2 除了传递 BashOperator 特有的 bash_command 参数外,还传递了基本参数 retries,这个 retries 会覆盖默认参数中的值。

1
2
3
4
5
t1 = BashOperator(  
    task_id='print_date',    bash_command='date',)  
  
t2 = BashOperator(  
    task_id='sleep',    depends_on_past=False,    bash_command='sleep 5',    retries=3,)  

可以通过以下方式设置不同 Task 之间的依赖关系。Airflow 会检测你的 script 是否成环,如果违反了则会抛出异常。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
t1.set_downstream(t2)  
  
# This means that t2 will depend on t1  
# running successfully to run.  
# It is equivalent to:  
t2.set_upstream(t1)  
  
# The bit shift operator can also be  
# used to chain operations:  
t1 >> t2  
  
# And the upstream dependency with the  
# bit shift operator:  
t2 << t1  
  
# Chaining multiple dependencies becomes  
# concise with the bit shift operator:  
t1 >> t2 >> t3  
  
# A list of tasks can also be set as  
# dependencies. These operations  
# all have the same effect:  
t1.set_downstream([t2, t3])  
t1 >> [t2, t3]  
[t2, t3] << t1  

Task Instance 可能有不同的状态,以下是其生命周期:

../_images/task_lifecycle_diagram.png
../_images/task_lifecycle_diagram.png

Sensor

Sensor: task that needs to wait for an external event

Scheduler

Heartbeat

在 airflow 中,heartbeat 是相当重要的一个概念,是airflow中各个组件协同、通信、交互、工作的一种方式。在 shceduler 中,有下面几种重要的heatbeat。

heartbeat 作用
scheduler -> executor 触发运行的task,并更新task状态
scheduler -> DagFileProcessorAgent 检测DagFileProcessorManager的存活状态,并处理manager返回的消息
scheduler -> scheduler 检查scheduler是否存在问题,更新schedulerjob数据库的信息

功能逻辑

调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。 但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。

这里简单介绍下 Scheduler 的功能逻辑:

  • 遍历 dags 路径下的所有 dag 文件, 启动一定数量的进程(进程池),并且给每个进程指派一个 dag 文件。 每个 DagFileProcessor 解析分配给它的dag文件,并根据解析结果在 DB中创建 DagRuns 和 TaskInstance。
  • 在 scheduler_loop 中,检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 executor 队列,将新排列的 TaskInstance 状态更新为QUEUED状态。
  • 每个可用的 executor 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录更新为SCHEDULED。
  • 当一个 TaskInstance 完成运行,关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
  • 一旦所有的dag处理完毕后,就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个dag的处理时间可能很长,导致到下一轮处理的时候这个dag还没有处理完成。 Airflow 的处理逻辑是在这一轮不为这个dag创建进程,这样就不会阻塞进程去处理其余dag

Executor

Local Executor

Celery Executor

Celery 是一个借助队列机制实现的分布式任务调度框架,它本身无队列功能,需要借助第三方组件,比如 Redis 或者 RabbitMQ。

Celery 的任务队列包含两个重要的组件

  • Broker: 存储要执行的命令列表,需要借助第三方的用于收发消息的消息中间件(Message Broker),如 RabbitMQ、Redis
  • Result Backend: 存储已完成命令的状态,一般存储到 Database

当调度器 executor = CeleryExecutor 时,包含两个重要的守护进程:

  1. Celery Worker: 守护进程,通过airflow worker -D启动一个或多个 Celery 的任务队列,负责执行具体的 DAG 任务,默认队列名为default
  2. Celery Flower: 守护进程,通过airflow flower -D启动,消息队列监控工具,用于监控 Celery 消息队列,默认的端口为5555,可以在浏览器地址栏中输入http://127.0.0.1:5555来访问

CerleryKuberneter Executor

Kubernetes Executor

Kubernetes Executor 把每个 Task Instance 执行为 K8s 集群中的一个 Pod,它是 Airflow Scheduler 里面的一个进程。Airflow Scheduler 并不一定需要运行在 Kubernetes 中,只需要能够访问 Kubernetes 集群即可。

当 DAG 提交了一个 Task,Kubernetes Executor 会在 Kubernetes 集群中创建一个 worker Pod,由 Pod 执行任务并且上报结果。

下图是 Airflow 在 Kuberntes 集群中部署的一种示例:

下图是具体创建一个 Task 时候对应的时序图:

Get Hands dirty

Enviroment Setup

Airflow 官方文档提供了通过 docker 和 docker-compose 在本地安装 airflow 的教程2,对于在生产环境使用 Airflow,建议还是通过 Kubernetes 和 Helm Chart 部署3

Command Lines

1
2
3
4
5
6
# initialize the database tables  
airflow db init  
  
# print the list of active DAGs  
airflow dags list  
  

airflow tasks

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# prints the list of tasks in the "tutorial" DAG  
airflow tasks list tutorial  
  
# prints the hierarchy of tasks in the "tutorial" DAG  
airflow tasks list tutorial --tree  
  
# command layout: command subcommand [dag_id] [task_id] [(optional) date]  
# testing print_date  
airflow tasks test tutorial print_date 2015-06-01  
  
# testing sleep  
airflow tasks test tutorial sleep 2015-06-01  
  

Reference