How-To Guide: Creating Custom Sensors
Sensors are a type of operator in Apache Airflow that are designed to wait for a certain condition to be met before allowing downstream tasks to execute. While Airflow provides a rich set of built-in sensors for common use cases (e.g., S3KeySensor, HttpSensor, SqlSensor), there will be times when you need to monitor a condition specific to your environment or application. This guide will walk you through the process of creating your own custom sensor.
Why Create a Custom Sensor?
You might need a custom sensor when:
- There isn't a pre-built sensor for the condition you need to check.
- You need to integrate with a proprietary system or API.
- You want to implement complex logic for condition checking that goes beyond simple existence or state checks.
- You need to optimize polling frequency or timeout based on specific requirements.
The Basics of a Custom Sensor
A custom sensor in Airflow is essentially a Python class that inherits from the base airflow.sensors.base.BaseSensorOperator class. The core of your custom sensor's logic will reside in the poke() method.
The poke() Method
The poke() method is called repeatedly by the Airflow scheduler at a specified interval (defined by the poke_interval parameter of the operator). It should return True if the condition is met, and False otherwise. If poke() returns True, the sensor succeeds, and downstream tasks are unblocked. If it returns False, the scheduler will call poke() again after the poke_interval has elapsed.
Essential Parameters
When creating a custom sensor, you'll typically need to override or utilize the following parameters from the base class:
task_id: A unique identifier for the sensor task.poke_interval: The time in seconds to wait between calls topoke(). Defaults to 5 seconds.timeout: The maximum time in seconds to wait for the sensor to succeed. If the timeout is reached, the sensor will fail. Defaults to 60 seconds.mode: Specifies how the sensor should operate. Common options are'poke'(default, repeatedly callspoke()) and'reschedule'(releases the worker slot and reschedules the check, useful for long-running waits).soft_fail: If set toTrue, the sensor will succeed even if it times out, allowing downstream tasks to run. Defaults toFalse.
Example: A Simple Custom Sensor
Let's create a custom sensor that checks for the existence of a specific file in a local directory.
1. Create a Python File for Your Sensor
Place this file in your Airflow plugins directory (e.g., $AIRFLOW_HOME/plugins/sensors/file_sensor.py). If you don't have a plugins directory, create one.
import os
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.log.logging_mixin import LoggingMixin
class FileExistSensor(BaseSensorOperator, LoggingMixin):
"""
Checks if a file exists at a given path.
:param filepath: The path to the file to check for.
:type filepath: str
"""
template_fields = ('filepath',) # Allows Jinja templating for filepath
def __init__(
self,
*,
filepath: str,
**kwargs,
):
super().__init__(**kwargs)
self.filepath = filepath
def poke(self, context):
"""
Pokes the file system to check for the file's existence.
"""
self.log.info("Checking for file: %s", self.filepath)
if os.path.exists(self.filepath):
self.log.info("File %s found!", self.filepath)
return True
else:
self.log.info("File %s not found yet.", self.filepath)
return False
2. Integrate into a DAG
Now you can use your custom sensor in a DAG like any other operator.
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from sensors.file_sensor import FileExistSensor # Import your custom sensor
with DAG(
dag_id="custom_sensor_example",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "sensor"],
) as dag:
wait_for_my_file = FileExistSensor(
task_id="wait_for_trigger_file",
filepath="/path/to/your/trigger/file.txt", # Update this path
poke_interval=10, # Check every 10 seconds
timeout=600, # Timeout after 10 minutes
mode="poke",
# You can also use soft_fail=True if you want the DAG to continue even if the file isn't found within the timeout
)
process_data = BashOperator(
task_id="process_data_after_file_exists",
bash_command="echo 'File found, processing data...'",
)
wait_for_my_file >> process_data
/path/to/your/trigger/file.txt with an actual path on your Airflow worker's filesystem. You can also use Jinja templating for the filepath parameter if the path is dynamic.
Advanced Considerations
Using reschedule Mode
For sensors that might need to wait for extended periods without consuming a worker slot, use mode='reschedule'. In this mode, the sensor task will be released back to the scheduler queue after each poke, and rescheduled to run again later. This is more efficient than poke mode for long waits.
long_running_wait = FileExistSensor(
task_id="long_wait_for_file",
filepath="/path/to/some/large/file.dat",
poke_interval=60, # Check once a minute
timeout=3600, # Timeout after 1 hour
mode="reschedule",
)
Handling External Services
When integrating with external services, ensure you handle potential network issues, authentication, and API rate limits gracefully within your poke() method. Consider using context variables (e.g., {{ ds }}, {{ ts }}) for dynamic queries or file paths.
Testing Your Custom Sensor
Thoroughly test your custom sensor by simulating different scenarios:
- The condition is met immediately.
- The condition is met after several pokes.
- The condition is never met, and the timeout is reached.
- Test with
soft_fail=TrueandFalse.
Customizing the Operator
You can extend your custom sensor further by:
- Adding more constructor parameters to customize its behavior.
- Implementing the
template_fieldsattribute to allow Jinja templating for your custom parameters. - Overriding other methods like
pre_execute()orpost_execute()if needed.
Conclusion
Creating custom sensors in Airflow empowers you to build robust data pipelines that can adapt to any condition. By extending BaseSensorOperator and implementing the poke() method effectively, you can create powerful monitoring tools tailored to your specific needs.