跳至主要內容

XCom


XCom

介绍

在Airflow中,可以使用XCom来实现跨任务通信。XCom是Airflow中的一种特殊的数据交换机制,允许在不同的任务之间传递数据。通常在一个DAG中的任务之间需要共享一些数据,例如上一个任务的输出结果需要作为下一个任务的输入,或者多个任务需要共享一些状态信息。这时候就可以使用XCom来实现任务之间的数据交换。

XCom允许任务之间传递任意类型的数据,并且支持任务之间的同步和异步传递。在Airflow中,每个任务都有一个对应的XCom表,可以通过调用xcom_push方法将数据写入XCom表,然后在另一个任务中通过调用xcom_pull方法来读取数据。

具体来说,可以在一个任务中使用ti(Task Instance)对象来访问XCom表。

举例

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def push_data(**context):
    # push data to XCom
    context['ti'].xcom_push(key='my_key', value='my_value')

def pull_data(**context):
    # pull data from XCom
    my_value = context['ti'].xcom_pull(key='my_key')
    print(my_value)

dag = DAG('my_dag', description='My DAG')

push_task = PythonOperator(
    task_id='push_task',
    python_callable=push_data,
    provide_context=True,
    dag=dag,
)

pull_task = PythonOperator(
    task_id='pull_task',
    python_callable=pull_data,
    provide_context=True,
    dag=dag,
)

push_task >> pull_task

在上面的例子中,push_data任务将数据my_value写入到XCom表中,并通过ti对象提供的xcom_push方法实现。然后,pull_data任务通过ti对象提供的xcom_pull方法从XCom表中读取数据,并打印出来。

这样,通过XCom机制,pull_task就可以在push_task执行后获取到my_value这个数据。

上次编辑于:
贡献者: Neil