跳至主要內容

DAG


DAG

介绍

在Airflow中,DAG代表“有向无环图”(Directed Acyclic Graph),是Airflow中最重要的概念之一。DAG用于定义工作流程,即一系列任务的执行顺序和依赖关系,以及它们的调度和监控。

在Airflow中,每个DAG都由一组任务(Task)组成,这些任务可以串行、并行或混合执行。DAG中的任务使用Operator来定义,每个Operator代表DAG中的一个单独的任务,并定义了任务的执行逻辑。例如,BashOperator表示在本地或远程系统上执行Shell命令,PythonOperator表示执行Python函数,而EmailOperator表示发送电子邮件等。

DAG可以通过Python脚本来定义,其中定义了DAG的名称、描述、调度计划和任务之间的依赖关系。Airflow通过解析这些Python脚本来生成DAG对象,然后将它们添加到Airflow的元数据库中。DAG对象可以在Airflow Web UI中进行可视化展示,并且可以在Web UI中管理DAG的调度和监控。

DAG中的任务可以通过设置依赖关系来实现任务之间的调度顺序,例如,一个任务可以在另一个任务完成后执行,或者多个任务可以同时执行。Airflow会根据任务之间的依赖关系,计算任务的执行顺序,并将任务提交给执行器(Executor)进行执行。

举例

好的,以下是一个简单的DAG例子,它包括三个任务:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 24),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_dag',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval=timedelta(days=1),
)

task1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Hello World from Task 1"',
    dag=dag,
)

task2 = BashOperator(
    task_id='task_2',
    bash_command='echo "Hello World from Task 2"',
    dag=dag,
)

task3 = BashOperator(
    task_id='task_3',
    bash_command='echo "Hello World from Task 3"',
    dag=dag,
)

task1 >> task2 >> task3

在这个DAG中,我们定义了三个BashOperator类型的任务,分别是task1、task2和task3。它们的命令分别是输出"Hello World from Task 1"、"Hello World from Task 2"和"Hello World from Task 3"。这三个任务之间的依赖关系是task1在执行完成后,task2才会开始执行;task2在执行完成后,task3才会开始执行。这些依赖关系通过操作符(>>)来定义。

此外,我们还定义了一些DAG的元数据,例如默认参数、DAG的名称和描述、以及调度计划。在这个例子中,DAG的调度计划是每天执行一次。

Schedule

在Airflow中,Schedule(调度)是指任务在何时运行的计划。Airflow使用一种称为DAG(Directed Acyclic Graph,有向无环图)的结构来组织任务,并使用Schedule来指定每个任务在何时运行。

Airflow的Schedule是由以下三个元素组成:

  1. Start Date(开始日期):任务的第一个实例应该在何时运行,通常是指定为一个具体的日期。
  2. Interval(间隔):任务的运行周期,可以是hourly(每小时一次)、daily(每天一次)、weekly(每周一次)等等。
  3. End Date(结束日期):任务的最后一个实例应该在何时运行,通常也是指定为一个具体的日期。

在Airflow中,可以使用Python的datetime库来指定开始日期和结束日期,同时也可以使用timedelta来指定任务的运行间隔。

例如,以下代码定义了一个任务的Schedule,该任务每天凌晨1点运行一次,运行时间从2023年6月24日开始,一直到2023年6月30日结束:

from datetime import datetime, timedelta
from airflow import DAG

dag = DAG(
    dag_id='my_dag',
    start_date=datetime(2023, 6, 24),
    end_date=datetime(2023, 6, 30),
    schedule_interval=timedelta(days=1),
)

这个Schedule会在每天凌晨1点运行一次,直到2023年6月30日结束。

上次编辑于:
贡献者: Neil