How To: Tasks

This guide provides practical instructions and examples on how to define, configure, and manage tasks within Apache Airflow DAGs.

What is a Task?

A task is the smallest unit of work in an Airflow DAG. It represents a single operation that needs to be performed, such as running a Python script, executing a SQL query, or sending an email.

Defining a Task

Tasks are defined by instantiating Operators. Operators are templated classes that define the behavior of a task. Here are some common ways to define tasks:

Using the BashOperator

The BashOperator allows you to execute bash commands.


from airflow.operators.bash import BashOperator
from airflow.models.dag import DAG
import pendulum

with DAG(
    dag_id='bash_task_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=['example', 'bash'],
) as dag:
    run_this_last = BashOperator(
        task_id='run_this_last',
        bash_command='echo "I ran last!"',
    )
                

Using the PythonOperator

The PythonOperator allows you to execute Python functions.


from airflow.operators.python import PythonOperator
from airflow.models.dag import DAG
import pendulum

def my_python_function():
    print("Hello from PythonOperator!")

with DAG(
    dag_id='python_task_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=['example', 'python'],
) as dag:
    run_python_task = PythonOperator(
        task_id='run_python_task',
        python_callable=my_python_function,
    )
                

Using the DummyOperator (for structure)

The DummyOperator is useful for creating task dependencies or marking the start/end of a DAG without performing any actual work.


from airflow.operators.empty import EmptyOperator
from airflow.models.dag import DAG
import pendulum

with DAG(
    dag_id='dummy_task_example',
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=['example', 'dummy'],
) as dag:
    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end')
                

Task Dependencies

Defining dependencies between tasks is crucial for orchestrating workflows. Airflow provides intuitive ways to set these dependencies:

Using the `>>` and `<<` operators


# ... (previous task definitions)

start >> run_python_task >> run_this_last
# or
run_this_last << run_python_task << start
                

Using `set_upstream` and `set_downstream` methods


run_python_task.set_upstream(start)
run_this_last.set_downstream(run_python_task)
                

Task Configuration

Tasks can be configured with various parameters to control their behavior:

  • task_id: A unique identifier for the task within the DAG.
  • retries: The number of times to retry the task if it fails.
  • retry_delay: The time delay between retries (e.g., timedelta(minutes=5)).
  • execution_timeout: The maximum time a task is allowed to run before being marked as failed.
  • depends_on_past: If set to True, the task will only run if the previous instance of the same task succeeded.
  • trigger_rule: Determines when a task should run based on the state of its upstream tasks (e.g., all_success, all_failed, one_success).
  • email_on_failure / email_on_retry: Whether to send an email notification on failure or retry.
  • params: A dictionary of parameters that can be passed to the task.

Tip: Use params to pass dynamic configuration values to your tasks, which can be overridden at runtime.

Task States

Tasks can be in various states during their lifecycle:

  • queued: The task is waiting to be picked up by a worker.
  • running: The task is currently being executed.
  • success: The task completed successfully.
  • failed: The task failed to complete.
  • skipped: The task was skipped (e.g., due to a condition).
  • upstream_failed: An upstream task failed.
  • deferred: The task is waiting for an external event (e.g., in a Triggerer).

Note: Understanding task states is crucial for debugging and monitoring your workflows.

Advanced Task Features

  • Branching: Use operators like BranchPythonOperator to dynamically decide which task to run next.
  • Sensors: Operators that wait for a certain condition to be met before succeeding (e.g., FileSensor, HttpSensor).
  • SubDAGs: Grouping related tasks into a separate DAG. (Note: SubDAGs are often discouraged in favor of TaskGroups for newer Airflow versions.)
  • TaskGroups: A more modern way to visually group tasks in the Airflow UI.

For more detailed information on specific operators and advanced configurations, please refer to the Operators and Hooks Reference.