Sensor
Sensor
介绍
在 Airflow 中,Sensor 是一种特殊类型的 Operator,主要用于检测某些条件是否满足,如果满足就让任务继续执行,否则等待一段时间再重新检测条件。Sensor 可以用于等待文件的出现、某个服务的可用性、数据库中数据的更新等场景。
Airflow 中常用的 Sensor 有以下几种:
- FileSensor:用于等待指定路径下的文件出现。
- TimeSensor:用于等待指定的时间。
- ExternalTaskSensor:用于等待另一个 DAG 中的任务完成。
- HttpSensor:用于等待 HTTP API 的响应。
- SqlSensor:用于等待数据库中的数据更新。
- HdfsSensor:用于等待 HDFS 上的文件出现。
Sensor 通常会周期性地检测条件是否满足,直到满足条件或达到最大尝试次数为止。Sensor 运行的频率可以通过 poke_interval
参数进行配置,可以通过 mode
参数指定是等待条件满足后继续执行任务(mode=reschedule
),还是立即继续执行任务但记录下等待的时间(mode=poke
)。
Sensor 可以与其他 Operator 结合使用,例如可以在文件出现后使用 BashOperator 进行文件的处理,或者在数据库数据更新后使用 PythonOperator 进行数据分析等操作。Sensor 的使用可以提高任务的可靠性和稳定性,确保任务在条件满足后再执行。
例子
假设我们有一个 DAG,需要等待某个文件出现后再进行下一步操作。我们可以使用 Airflow 中的 FileSensor 来等待文件的出现。
以下是一个使用 FileSensor 的 DAG 示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.sensors import FileSensor
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 6, 25),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('wait_for_file', default_args=default_args, schedule_interval=None)
# 等待文件出现
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/file',
fs_conn_id='fs_default',
poke_interval=10,
dag=dag
)
# 处理文件的任务
process_file = BashOperator(
task_id='process_file',
bash_command='echo "Processing file..."',
dag=dag
)
# 定义任务依赖关系
wait_for_file >> process_file
在上述示例中,我们首先定义了一个 FileSensor,它会等待 /path/to/file
文件出现。然后我们定义了一个 BashOperator,用于处理文件。最后将两个任务进行依赖关系的定义,使得等待文件任务完成后再执行处理文件任务。
在 DAG 运行时,FileSensor 会在每个 poke_interval
指定的时间间隔内检查文件是否存在,如果文件存在则结束等待,任务继续执行。如果文件不存在,则等待下一个周期再次检查,直到文件存在或达到最大尝试次数为止。