跳至主要內容

Sensor


Sensor

介绍

在 Airflow 中,Sensor 是一种特殊类型的 Operator,主要用于检测某些条件是否满足,如果满足就让任务继续执行,否则等待一段时间再重新检测条件。Sensor 可以用于等待文件的出现、某个服务的可用性、数据库中数据的更新等场景。

Airflow 中常用的 Sensor 有以下几种:

  1. FileSensor:用于等待指定路径下的文件出现。
  2. TimeSensor:用于等待指定的时间。
  3. ExternalTaskSensor:用于等待另一个 DAG 中的任务完成。
  4. HttpSensor:用于等待 HTTP API 的响应。
  5. SqlSensor:用于等待数据库中的数据更新。
  6. HdfsSensor:用于等待 HDFS 上的文件出现。

Sensor 通常会周期性地检测条件是否满足,直到满足条件或达到最大尝试次数为止。Sensor 运行的频率可以通过 poke_interval 参数进行配置,可以通过 mode 参数指定是等待条件满足后继续执行任务(mode=reschedule),还是立即继续执行任务但记录下等待的时间(mode=poke)。

Sensor 可以与其他 Operator 结合使用,例如可以在文件出现后使用 BashOperator 进行文件的处理,或者在数据库数据更新后使用 PythonOperator 进行数据分析等操作。Sensor 的使用可以提高任务的可靠性和稳定性,确保任务在条件满足后再执行。

例子

假设我们有一个 DAG,需要等待某个文件出现后再进行下一步操作。我们可以使用 Airflow 中的 FileSensor 来等待文件的出现。

以下是一个使用 FileSensor 的 DAG 示例:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.sensors import FileSensor
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 25),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('wait_for_file', default_args=default_args, schedule_interval=None)

# 等待文件出现
wait_for_file = FileSensor(
    task_id='wait_for_file',
    filepath='/path/to/file',
    fs_conn_id='fs_default',
    poke_interval=10,
    dag=dag
)

# 处理文件的任务
process_file = BashOperator(
    task_id='process_file',
    bash_command='echo "Processing file..."',
    dag=dag
)

# 定义任务依赖关系
wait_for_file >> process_file

在上述示例中,我们首先定义了一个 FileSensor,它会等待 /path/to/file 文件出现。然后我们定义了一个 BashOperator,用于处理文件。最后将两个任务进行依赖关系的定义,使得等待文件任务完成后再执行处理文件任务。

在 DAG 运行时,FileSensor 会在每个 poke_interval 指定的时间间隔内检查文件是否存在,如果文件存在则结束等待,任务继续执行。如果文件不存在,则等待下一个周期再次检查,直到文件存在或达到最大尝试次数为止。

上次编辑于:
贡献者: Neil