Airflow
Apache Airflow 是一个提供基于 DAG 来编排工作流的、可视化的分布式任务调度平台。Airflow 提供了通过 Python 代码实现可编程方式定义 DAG 工作流。当工作流通过代码来定义时,它们变得更加可维护、可版本化、可测试和协作。
在数据处理领域,各个任务之间的依赖关系往往比较复杂,无法通过 crontab 等基础工具进行调度,因此引入了 Airflow 这种调度平台来编排和管理:
- 时间依赖:任务需要等待某一个时间点触发
- 外部系统依赖:任务依赖外部系统需要调用接口去访问
- 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响
- 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行
Architecture
Airflow的架构图如下:
- Metadata Database:Airflow的元数据库,用于Webserver、Executor及Scheduler存储各种状态数据,通常是MySQL或PostgreSQL
- User Interface:用户界面,即前端web界面,提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以在界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。
- Webserver:web服务器,用于提供用户界面的操作接口
- Scheduler:调度器,负责处理触发调度的工作流,并将工作流中的任务提交给执行器处理
- Executor:执行器,负责处理任务实例。在本地模式下会运行在调度器中,并负责所有任务实例的处理。但是大多数适合于生产的执行器实际上是一个消息队列(RabbitMQ、Redis),负责将任务实例推送给工作节点执行
- Workers:工作节点,真正负责调起任务进程、执行任务的节点,worker可以有多个,是独立的进程
- DAG Directory:存放DAG任务图定义的Python代码的目录,代表一个Airflow的处理流程。代码文件所在的位置通过Airflow配置 dags_folder 指定,需要保证执行器、调度器以及工作节点都能够访问到
Concept
DAG
Airflow 的 DAG 工作流编排 Python 代码本质上是一个描述 DAG 结构的配置文件,真正的 Task 会在 python script context 之外的 context 中执行。
在具体的编写中,我们需要创建一个 DAG 对象来实现对于工作流的定义。
可以通过参数 default_args 给 DAG 传递默认参数,这是一个 dict 类型的数据,所有的参数都会被传递给 DAG 中的每一个 Operator。
|
|
对 DAG 而言,dag_id 作为是其唯一的标志符,我们通过 schedule_iteraval 参数可以定义其执行周期,这是一个 crontab 格式的字符串。
|
|
一个 DAG 的运行实例被称作 DAG Run。
Operators
Operator 定义了在 Airflow 中需要完成的任务单元。所有的 Operator 都继承自 BaseOperator,它包含了运行任务所需要的参数。除了 BaseOperator 所包含的参数外,每个 Operator 都有自己的参数,一些实用的比较多的 Operator 包括 BashOperator、PythonOperator、KubernetesPodOperator。
BashOperator1
|
|
PythonOperator
KubernetesPodOperator
Tasks
Task 是 Operator 的一个运行实例,它决定了如何在 DAG 的 context 中执行你的 Operator 的任务。在 Airflow 中工作流上每个 task 都是原子可重试的,一个工作流某个环节的 task 失败可自动或手动进行重试,不必从头开始跑。
在下面的例子中,我们将 BashOperator 实例化为两个独立的任务。对每个 Task 而言,它的第一个参数 task_id 作为其唯一的标志符。可以看到 t2 除了传递 BashOperator 特有的 bash_command 参数外,还传递了基本参数 retries,这个 retries 会覆盖默认参数中的值。
|
|
可以通过以下方式设置不同 Task 之间的依赖关系。Airflow 会检测你的 script 是否成环,如果违反了则会抛出异常。
|
|
Task Instance 可能有不同的状态,以下是其生命周期:
Sensor
Sensor: task that needs to wait for an external event
Scheduler
Heartbeat
在 airflow 中,heartbeat 是相当重要的一个概念,是airflow中各个组件协同、通信、交互、工作的一种方式。在 shceduler 中,有下面几种重要的heatbeat。
| heartbeat | 作用 |
|---|---|
| scheduler -> executor | 触发运行的task,并更新task状态 |
| scheduler -> DagFileProcessorAgent | 检测DagFileProcessorManager的存活状态,并处理manager返回的消息 |
| scheduler -> scheduler | 检查scheduler是否存在问题,更新schedulerjob数据库的信息 |
功能逻辑
调度器实际上就是一个 airflow.jobs.SchedulerJob 实例 job 持续运行 run 方法。job.run() 在开始时将自身的信息加入到 job 表中,并维护状态和心跳,预期能够正常结束,将结束时间也更新到表中。 但是实际上往往因为异常中断,导致结束时间为空。不管是如何进行的退出,SchedulerJob 退出时会关闭所有子进程。
这里简单介绍下 Scheduler 的功能逻辑:
- 遍历 dags 路径下的所有 dag 文件, 启动一定数量的进程(进程池),并且给每个进程指派一个 dag 文件。 每个 DagFileProcessor 解析分配给它的dag文件,并根据解析结果在 DB中创建 DagRuns 和 TaskInstance。
- 在 scheduler_loop 中,检查与活动 DagRun 关联的 TaskInstance 的状态,解析 TaskInstance 之间的任何依赖,标识需要被执行的 TaskInstance,然后将它们添加至 executor 队列,将新排列的 TaskInstance 状态更新为QUEUED状态。
- 每个可用的 executor 从队列中取一个 TaskInstance,然后开始执行它,将此 TaskInstance 的数据库记录更新为SCHEDULED。
- 当一个 TaskInstance 完成运行,关联的 executor 就会报告到队列并更新数据库中的 TaskInstance 的状态(例如“完成”、“失败”等)。
- 一旦所有的dag处理完毕后,就会进行下一轮循环处理。这里还有一个细节就是上一轮的某个dag的处理时间可能很长,导致到下一轮处理的时候这个dag还没有处理完成。 Airflow 的处理逻辑是在这一轮不为这个dag创建进程,这样就不会阻塞进程去处理其余dag
Executor
Local Executor
Celery Executor
Celery 是一个借助队列机制实现的分布式任务调度框架,它本身无队列功能,需要借助第三方组件,比如 Redis 或者 RabbitMQ。
Celery 的任务队列包含两个重要的组件
- Broker: 存储要执行的命令列表,需要借助第三方的用于收发消息的消息中间件(Message Broker),如 RabbitMQ、Redis
- Result Backend: 存储已完成命令的状态,一般存储到 Database
当调度器 executor = CeleryExecutor 时,包含两个重要的守护进程:
- Celery Worker: 守护进程,通过airflow worker -D启动一个或多个 Celery 的任务队列,负责执行具体的 DAG 任务,默认队列名为default
- Celery Flower: 守护进程,通过airflow flower -D启动,消息队列监控工具,用于监控 Celery 消息队列,默认的端口为5555,可以在浏览器地址栏中输入http://127.0.0.1:5555来访问
CerleryKuberneter Executor
Kubernetes Executor
Kubernetes Executor 把每个 Task Instance 执行为 K8s 集群中的一个 Pod,它是 Airflow Scheduler 里面的一个进程。Airflow Scheduler 并不一定需要运行在 Kubernetes 中,只需要能够访问 Kubernetes 集群即可。
当 DAG 提交了一个 Task,Kubernetes Executor 会在 Kubernetes 集群中创建一个 worker Pod,由 Pod 执行任务并且上报结果。
下图是 Airflow 在 Kuberntes 集群中部署的一种示例:
下图是具体创建一个 Task 时候对应的时序图:
Get Hands dirty
Enviroment Setup
Airflow 官方文档提供了通过 docker 和 docker-compose 在本地安装 airflow 的教程2,对于在生产环境使用 Airflow,建议还是通过 Kubernetes 和 Helm Chart 部署3。
Command Lines
|
|
airflow tasks
|
|