Apache Airflow Cookbook

Recipes for common Airflow tasks

Monitoring and Alerting

Effective monitoring and alerting are crucial for maintaining the health and reliability of your Apache Airflow deployments. This section provides practical examples and strategies for keeping your pipelines running smoothly and getting notified when issues arise.

1. Basic Task Monitoring with Callbacks

Airflow provides built-in callbacks that can be triggered on task success, failure, or retry. These are a fundamental way to get notified about individual task events.

Task Failure Callback

You can define a Python function to be executed when a task fails. This function can send an email, log an error, or trigger an external alert system.


from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime

def task_failure_callback(context):
    ti = context['ti']
    print(f"Task {ti.task_id} failed in DAG {ti.dag_id} at {ti.execution_date}")
    # Add your alerting logic here (e.g., send email, trigger Slack notification)

with DAG(
    dag_id='monitoring_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    on_failure_callback=[task_failure_callback],
    tags=['monitoring', 'example'],
) as dag:
    run_this_task = PythonOperator(
        task_id='run_this_task',
        python_callable=lambda: 1/0, # This will intentionally fail
    )
                

Task Success/Retry Callbacks

Similarly, you can define callbacks for success and retry events to track the lifecycle of your tasks.


def task_success_callback(context):
    ti = context['ti']
    print(f"Task {ti.task_id} succeeded in DAG {ti.dag_id} at {ti.execution_date}")

def task_retry_callback(context):
    ti = context['ti']
    print(f"Task {ti.task_id} retried in DAG {ti.dag_id} at {ti.execution_date}")

# Add these to the DAG definition:
# on_success_callback=[task_success_callback],
# on_retry_callback=[task_retry_callback],
                

2. DAG-Level Monitoring

You can set callbacks at the DAG level to monitor the overall status of a DAG run.


from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime
from airflow.models.dag import DAG

def dag_failure_callback(context):
    dag_run = context['dag_run']
    print(f"DAG Run {dag_run.run_id} failed.")
    # Add your alerting logic for the entire DAG run

def dag_success_callback(context):
    dag_run = context['dag_run']
    print(f"DAG Run {dag_run.run_id} succeeded.")

with DAG(
    dag_id='dag_level_monitoring',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    on_failure_callback=[dag_failure_callback],
    on_success_callback=[dag_success_callback],
    tags=['monitoring', 'dag-level'],
) as dag:
    start = PythonOperator(
        task_id='start',
        python_callable=lambda: print("DAG started"),
    )
    # ... other tasks ...
    end = PythonOperator(
        task_id='end',
        python_callable=lambda: print("DAG finished"),
    )
    start >> end
                

3. Integrating with External Alerting Systems

Airflow's flexibility allows integration with popular alerting tools like Slack, PagerDuty, and email services. The key is to use operators or custom callbacks that interface with these services.

Slack Notifications

The SlackAPIPostOperator is useful for sending messages to Slack channels. You can use this within a task or a callback.


from airflow.providers.slack.operators.slack_webhook import SlackAPIPostOperator
from airflow.utils.dates import datetime
from airflow.models.dag import DAG

def send_slack_alert(context):
    ti = context['ti']
    slack_message = f"""
    :red_circle: Task Failed.
    *Task*: {ti.task_id}
    *DAG*: {ti.dag_id}
    *Execution Time*: {ti.execution_date}
    *Log URL*: {ti.log_url}
    """
    slack_notification = SlackAPIPostOperator(
        task_id='slack_notification',
        slack_webhook_conn_id='slack_default', # Ensure you have a Slack connection configured
        message=slack_message,
        trigger_rule='all_done', # Ensures it runs even if other tasks fail
    )
    return slack_notification.execute(context=context)

with DAG(
    dag_id='slack_alert_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
    on_failure_callback=[send_slack_alert],
    tags=['monitoring', 'slack'],
) as dag:
    fail_task = PythonOperator(
        task_id='this_will_fail',
        python_callable=lambda: 1/0,
    )
                
Note: Ensure you have configured a Slack connection in Airflow's UI (Admin -> Connections) with the correct webhook token or bot token and channel.

4. Airflow's Built-in Metrics and UI

The Airflow UI itself is a powerful monitoring tool. Familiarize yourself with:

  • DAGs View: Monitor the status of your DAGs (running, success, failed, skipped).
  • Graph View: Visualize task dependencies and their statuses.
  • Task Logs: Access detailed logs for each task instance.
  • Browse -> Task Instances: Filter and inspect individual task runs.

Airflow also exposes metrics that can be scraped by tools like Prometheus. You can configure this in your airflow.cfg file under the `[metrics]` section.


[metrics]
enable_metrics = True
# You can specify a listener, e.g., for Prometheus
# metrics_listener = airflow.metrics.prometheus.PrometheusListener
                

5. Advanced Alerting Strategies

  • Alerting on No Data/Staleness: Implement tasks that check for data freshness or expected file arrivals. If data is not received by a certain time, trigger an alert.
  • Resource Monitoring: Monitor CPU, memory, and disk usage of your Airflow workers and scheduler. Integrate with system monitoring tools.
  • Heartbeat Checks: Create a DAG that runs periodically and alerts if it fails to complete, indicating a potential scheduler or worker issue.
  • Custom Alerting DAGs: Build dedicated DAGs whose sole purpose is to query Airflow's metadata database or external services to check for anomalies and trigger alerts.
Tip: For critical alerts, consider using a combination of notification channels (e.g., email and Slack) to ensure visibility.

6. Using the `TriggerDagRunOperator` for Complex Alerting

You can use the TriggerDagRunOperator to launch a separate "alerting DAG" when a condition is met in your primary DAG.


from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import datetime
from airflow.models.dag import DAG

def check_condition(**context):
    # Your logic to check if an alert condition is met
    # Returns True if condition is met, False otherwise
    return True # Simulate condition met

with DAG(
    dag_id='main_data_pipeline',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['data-pipeline', 'monitoring'],
) as main_dag:
    data_processing_task = PythonOperator(
        task_id='process_data',
        python_callable=lambda: print("Processing data..."),
    )

    conditional_alert_trigger = TriggerDagRunOperator(
        task_id='trigger_alert_dag_if_needed',
        trigger_dag_id='alert_notification_dag',
        conf={"message": "Data pipeline encountered an issue."},
        wait_for_completion=False,
        trigger_rule='one_failed', # Trigger if any upstream task fails
        # Or use a PythonOperator to decide when to trigger:
        # python_callable=check_condition, # Use this if you want to conditionally trigger
    )
    data_processing_task >> conditional_alert_trigger

with DAG(
    dag_id='alert_notification_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None, # This DAG is triggered externally
    catchup=False,
    tags=['alerting'],
) as alert_dag:
    send_alert = PythonOperator(
        task_id='send_alert_message',
        python_callable=lambda conf: print(f"Sending alert: {conf.get('message', 'No message provided')}"),
    )
                

This approach keeps your main pipeline focused on its core task while delegating alert handling to a separate, specialized DAG.