跳至主要內容

SubDAG


SubDAG

介绍

在Airflow中,SubDag通常用于将大型工作流程分解为更小的、可重用的模块。SubDag是一种特殊的操作符,可以将一个DAG分解为一个或多个子DAG。这些子DAG可以在同一个DAG中或不同的DAG中重复使用。

SubDag通常用于将一个大型DAG分解为多个小的DAG,其中每个小DAG可以单独测试和调试,也可以在多个DAG中重复使用,从而减少了重复的代码量。SubDag可以递归使用,即可以在子DAG中包含另一个SubDag。

在SubDag操作符中,需要定义子DAG的DAG ID和任务 ID 前缀。在子DAG中,任务ID需要加上任务ID前缀,以避免与主DAG的任务ID重复。子DAG可以在主DAG中定义为一个任务,也可以嵌套在其他子DAG中。

需要注意的是,SubDag操作符可以在主DAG的任何位置使用,但是它会在运行时创建一个新的DAG运行实例,这可能会导致性能问题。因此,在使用SubDag时,需要特别关注性能和调度问题。

举例

好的,以下是一个使用SubDag的简单例子:

假设我们有一个主DAG,其中包含三个任务:TaskA、TaskB和TaskC。我们希望将TaskB和TaskC作为一个子DAG,以便可以在其他DAG中重复使用它们。我们可以使用SubDag操作符来实现这个目标。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from datetime import datetime

def subdag(parent_dag_name, child_dag_name, args):
    dag_subdag = DAG(
        dag_id=f"{parent_dag_name}.{child_dag_name}",
        default_args=args,
        schedule_interval="@daily",
    )

    with dag_subdag:
        task_b = DummyOperator(task_id="task_b")
        task_c = DummyOperator(task_id="task_c")
  
        task_b >> task_c

    return dag_subdag

args = {
    "owner": "airflow",
    "start_date": datetime(2023, 6, 23)
}

with DAG(dag_id="example_subdag", default_args=args, schedule_interval="@daily") as dag:
    task_a = DummyOperator(task_id="task_a")

    subdag_task = SubDagOperator(
        task_id="subdag_task",
        subdag=subdag("example_subdag", "subdag", args),
        dag=dag
    )

    task_d = DummyOperator(task_id="task_d")

    task_a >> subdag_task >> task_d

在这个例子中,我们使用了一个名为subdag的函数来定义一个子DAG。该子DAG包含了两个任务:TaskB和TaskC,它们之间使用了一个简单的依赖关系。

然后,我们在主DAG中使用SubDagOperator来引用这个子DAG。在这个例子中,我们将SubDagOperator作为TaskA和TaskD之间的一个任务,以便我们可以在主DAG中重复使用子DAG。

请注意,在SubDagOperator中,我们将子DAG作为参数传递,并指定了子DAG的任务ID和DAG ID。这些ID需要与子DAG中定义的ID相匹配。

这是一个简单的例子,但它演示了如何使用SubDag来将一个大型DAG分解为小的重复使用的模块。

上次编辑于:
贡献者: Neil