How-To Guide: Creating Custom Sensors

Sensors are a type of operator in Apache Airflow that are designed to wait for a certain condition to be met before allowing downstream tasks to execute. While Airflow provides a rich set of built-in sensors for common use cases (e.g., S3KeySensor, HttpSensor, SqlSensor), there will be times when you need to monitor a condition specific to your environment or application. This guide will walk you through the process of creating your own custom sensor.

Why Create a Custom Sensor?

You might need a custom sensor when:

The Basics of a Custom Sensor

A custom sensor in Airflow is essentially a Python class that inherits from the base airflow.sensors.base.BaseSensorOperator class. The core of your custom sensor's logic will reside in the poke() method.

The poke() Method

The poke() method is called repeatedly by the Airflow scheduler at a specified interval (defined by the poke_interval parameter of the operator). It should return True if the condition is met, and False otherwise. If poke() returns True, the sensor succeeds, and downstream tasks are unblocked. If it returns False, the scheduler will call poke() again after the poke_interval has elapsed.

Essential Parameters

When creating a custom sensor, you'll typically need to override or utilize the following parameters from the base class:

Example: A Simple Custom Sensor

Let's create a custom sensor that checks for the existence of a specific file in a local directory.

1. Create a Python File for Your Sensor

Place this file in your Airflow plugins directory (e.g., $AIRFLOW_HOME/plugins/sensors/file_sensor.py). If you don't have a plugins directory, create one.

                
import os
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.log.logging_mixin import LoggingMixin

class FileExistSensor(BaseSensorOperator, LoggingMixin):
    """
    Checks if a file exists at a given path.

    :param filepath: The path to the file to check for.
    :type filepath: str
    """

    template_fields = ('filepath',)  # Allows Jinja templating for filepath

    def __init__(
        self,
        *,
        filepath: str,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.filepath = filepath

    def poke(self, context):
        """
        Pokes the file system to check for the file's existence.
        """
        self.log.info("Checking for file: %s", self.filepath)
        if os.path.exists(self.filepath):
            self.log.info("File %s found!", self.filepath)
            return True
        else:
            self.log.info("File %s not found yet.", self.filepath)
            return False


            

2. Integrate into a DAG

Now you can use your custom sensor in a DAG like any other operator.

                
from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from sensors.file_sensor import FileExistSensor  # Import your custom sensor

with DAG(
    dag_id="custom_sensor_example",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "sensor"],
) as dag:
    wait_for_my_file = FileExistSensor(
        task_id="wait_for_trigger_file",
        filepath="/path/to/your/trigger/file.txt",  # Update this path
        poke_interval=10,  # Check every 10 seconds
        timeout=600,       # Timeout after 10 minutes
        mode="poke",
        # You can also use soft_fail=True if you want the DAG to continue even if the file isn't found within the timeout
    )

    process_data = BashOperator(
        task_id="process_data_after_file_exists",
        bash_command="echo 'File found, processing data...'",
    )

    wait_for_my_file >> process_data

            
Tip: Make sure to replace /path/to/your/trigger/file.txt with an actual path on your Airflow worker's filesystem. You can also use Jinja templating for the filepath parameter if the path is dynamic.

Advanced Considerations

Using reschedule Mode

For sensors that might need to wait for extended periods without consuming a worker slot, use mode='reschedule'. In this mode, the sensor task will be released back to the scheduler queue after each poke, and rescheduled to run again later. This is more efficient than poke mode for long waits.

                
    long_running_wait = FileExistSensor(
        task_id="long_wait_for_file",
        filepath="/path/to/some/large/file.dat",
        poke_interval=60,  # Check once a minute
        timeout=3600,      # Timeout after 1 hour
        mode="reschedule",
    )

            

Handling External Services

When integrating with external services, ensure you handle potential network issues, authentication, and API rate limits gracefully within your poke() method. Consider using context variables (e.g., {{ ds }}, {{ ts }}) for dynamic queries or file paths.

Testing Your Custom Sensor

Thoroughly test your custom sensor by simulating different scenarios:

Note: Ensure your sensor's dependencies are installed in your Airflow environment if it relies on external Python libraries.

Customizing the Operator

You can extend your custom sensor further by:

Conclusion

Creating custom sensors in Airflow empowers you to build robust data pipelines that can adapt to any condition. By extending BaseSensorOperator and implementing the poke() method effectively, you can create powerful monitoring tools tailored to your specific needs.