Sensors
Sensors are a special type of Operator that waits for a certain condition to be met. This is useful for triggering tasks only when external systems are ready or when specific data becomes available.
When to Use Sensors
Sensors are typically used in scenarios where a task in your DAG needs to wait for an external event, such as:
- A file to appear in a specific location (e.g.,
FileSensor). - A record to be inserted into a database table (e.g.,
SqlSensor). - A specific condition to be met in an external API.
- A message to arrive in a queue.
- An external process to complete.
How Sensors Work
When a Sensor task runs, it repeatedly polls an external system or checks a condition. If the condition is met, the Sensor succeeds, and the DAG can proceed to the next task. If the condition is not met, the Sensor continues to poll. This polling can be configured with parameters like poke_interval (how often to check) and timeout (maximum time to wait).
Sensors are designed to be lightweight. Instead of executing a full task and then retrying, they simply check the condition. If the condition is not met, they "poke" the external system without consuming significant resources.
Common Sensor Types
Airflow provides a variety of built-in sensors, and you can also create custom sensors. Here are a few common ones:
FileSensor
Waits for a file to exist in a specified location (local filesystem or cloud storage).
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_specific_file',
filepath='/path/to/your/file.csv',
poke_interval=5, # Check every 5 seconds
timeout=60*5, # Timeout after 5 minutes
mode='poke' # 'poke' is the default and recommended mode
)
SqlSensor
Waits for a SQL query to return a non-empty result.
from airflow.providers.common.sql.sensors.sql import SqlSensor
wait_for_data = SqlSensor(
task_id='wait_for_new_records',
conn_id='my_database_conn',
sql="SELECT COUNT(*) FROM my_table WHERE processed = false;",
success='0', # Sensor succeeds if the query returns '0' (meaning no unprocessed records)
poke_interval=10,
timeout=60*10
)
HttpSensor
Waits for a specific HTTP response from a web server (e.g., a 200 OK status code).
from airflow.providers.http.sensors.http import HttpSensor
wait_for_api = HttpSensor(
task_id='wait_for_api_endpoint',
http_conn_id='my_http_conn',
endpoint='/api/v1/status',
request_params={"service": "data_processing"},
response_check=lambda response: response.json().get("status") == "ready",
poke_interval=15,
timeout=60*15
)
ExternalTaskSensor
Waits for a task to complete in a different DAG.
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_upstream_dag = ExternalTaskSensor(
task_id='wait_for_data_ingestion_dag',
external_dag_id='data_ingestion_dag',
external_task_id='load_data',
allowed_states=['success'],
failed_states=['failed', 'skipped'],
poke_interval=30
)
Sensor Modes
Sensors can operate in two primary modes:
poke(default and recommended): The worker executing the sensor task periodically wakes up, checks the condition, and goes back to sleep if the condition is not met. This is efficient as it doesn't tie up a worker slot permanently.reschedule: The worker executing the sensor task reschedules itself to run again after thepoke_interval. This is more resource-efficient thanpoke, as it frees up the worker slot for other tasks while waiting. This is often the preferred mode for long-running sensors.
from airflow.sensors.filesystem import FileSensor
wait_for_file_reschedule = FileSensor(
task_id='wait_for_file_reschedule',
filepath='/path/to/another/file.txt',
poke_interval=60, # Check every minute
timeout=60*60, # Timeout after an hour
mode='reschedule' # Use reschedule mode for better resource utilization
)
Custom Sensors
If the built-in sensors don't meet your needs, you can create your own by inheriting from the BaseSensorOperator class and implementing the poke method.
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.decorators import apply_defaults
import requests
class MyApiSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, api_endpoint, *args, **kwargs):
super(MyApiSensor, self).__init__(*args, **kwargs)
self.api_endpoint = api_endpoint
def poke(self, context):
try:
response = requests.get(self.api_endpoint)
response.raise_for_status() # Raise an exception for bad status codes
data = response.json()
return data.get("is_ready", False)
except requests.exceptions.RequestException:
return False
# Example usage in a DAG:
# wait_for_my_api = MyApiSensor(
# task_id='wait_for_custom_api',
# api_endpoint='http://my-external-service.com/health',
# poke_interval=10,
# timeout=60*5
# )
Important Considerations
- Avoid Infinite Loops: Always set a
timeoutfor your sensors to prevent DAGs from running indefinitely if the condition is never met. - Resource Usage: Choose the appropriate
mode(pokevs.reschedule) based on your needs for resource efficiency.rescheduleis generally preferred for long-waiting sensors. - Idempotency: Ensure that your sensor's checking logic doesn't have side effects on the external system.
- Clear Logic: Make the condition checked by the sensor as precise and unambiguous as possible.
Sensors are a powerful tool for managing dependencies on external systems and events within your Airflow workflows.