Sensors

Sensors are a special type of Operator that waits for a certain condition to be met. This is useful for triggering tasks only when external systems are ready or when specific data becomes available.

Key Idea: Sensors are designed to be idempotent and should run indefinitely until the condition is met. They should not have side effects other than checking the condition.

When to Use Sensors

Sensors are typically used in scenarios where a task in your DAG needs to wait for an external event, such as:

How Sensors Work

When a Sensor task runs, it repeatedly polls an external system or checks a condition. If the condition is met, the Sensor succeeds, and the DAG can proceed to the next task. If the condition is not met, the Sensor continues to poll. This polling can be configured with parameters like poke_interval (how often to check) and timeout (maximum time to wait).

Sensors are designed to be lightweight. Instead of executing a full task and then retrying, they simply check the condition. If the condition is not met, they "poke" the external system without consuming significant resources.

Common Sensor Types

Airflow provides a variety of built-in sensors, and you can also create custom sensors. Here are a few common ones:

FileSensor

Waits for a file to exist in a specified location (local filesystem or cloud storage).

from airflow.sensors.filesystem import FileSensor

            wait_for_file = FileSensor(
                task_id='wait_for_specific_file',
                filepath='/path/to/your/file.csv',
                poke_interval=5,  # Check every 5 seconds
                timeout=60*5,     # Timeout after 5 minutes
                mode='poke'       # 'poke' is the default and recommended mode
            )

SqlSensor

Waits for a SQL query to return a non-empty result.

from airflow.providers.common.sql.sensors.sql import SqlSensor

            wait_for_data = SqlSensor(
                task_id='wait_for_new_records',
                conn_id='my_database_conn',
                sql="SELECT COUNT(*) FROM my_table WHERE processed = false;",
                success='0',  # Sensor succeeds if the query returns '0' (meaning no unprocessed records)
                poke_interval=10,
                timeout=60*10
            )

HttpSensor

Waits for a specific HTTP response from a web server (e.g., a 200 OK status code).

from airflow.providers.http.sensors.http import HttpSensor

            wait_for_api = HttpSensor(
                task_id='wait_for_api_endpoint',
                http_conn_id='my_http_conn',
                endpoint='/api/v1/status',
                request_params={"service": "data_processing"},
                response_check=lambda response: response.json().get("status") == "ready",
                poke_interval=15,
                timeout=60*15
            )

ExternalTaskSensor

Waits for a task to complete in a different DAG.

from airflow.sensors.external_task import ExternalTaskSensor

            wait_for_upstream_dag = ExternalTaskSensor(
                task_id='wait_for_data_ingestion_dag',
                external_dag_id='data_ingestion_dag',
                external_task_id='load_data',
                allowed_states=['success'],
                failed_states=['failed', 'skipped'],
                poke_interval=30
            )

Sensor Modes

Sensors can operate in two primary modes:

from airflow.sensors.filesystem import FileSensor

            wait_for_file_reschedule = FileSensor(
                task_id='wait_for_file_reschedule',
                filepath='/path/to/another/file.txt',
                poke_interval=60,  # Check every minute
                timeout=60*60,     # Timeout after an hour
                mode='reschedule'  # Use reschedule mode for better resource utilization
            )

Custom Sensors

If the built-in sensors don't meet your needs, you can create your own by inheriting from the BaseSensorOperator class and implementing the poke method.

from airflow.sensors.base import BaseSensorOperator
            from airflow.utils.decorators import apply_defaults
            import requests

            class MyApiSensor(BaseSensorOperator):

                @apply_defaults
                def __init__(self, api_endpoint, *args, **kwargs):
                    super(MyApiSensor, self).__init__(*args, **kwargs)
                    self.api_endpoint = api_endpoint

                def poke(self, context):
                    try:
                        response = requests.get(self.api_endpoint)
                        response.raise_for_status()  # Raise an exception for bad status codes
                        data = response.json()
                        return data.get("is_ready", False)
                    except requests.exceptions.RequestException:
                        return False

            # Example usage in a DAG:
            # wait_for_my_api = MyApiSensor(
            #     task_id='wait_for_custom_api',
            #     api_endpoint='http://my-external-service.com/health',
            #     poke_interval=10,
            #     timeout=60*5
            # )

Important Considerations

Sensors are a powerful tool for managing dependencies on external systems and events within your Airflow workflows.