DAGs (Directed Acyclic Graphs)

DAGs are the core of Airflow. A DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. Airflow Python SDK allows you to define these workflows in Python code.

Defining a DAG

A DAG is instantiated using the DAG class. You typically define tasks within the context of a DAG object using a with statement.


from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='my_first_dag',
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example', 'beginner'],
) as dag:
    # Define tasks here
    task1 = BashOperator(
        task_id='hello_world',
        bash_command='echo "Hello, world!"',
    )

    task2 = BashOperator(
        task_id='date',
        bash_command='date',
    )

    # Define task dependencies
    task1 >> task2
            

Key DAG Parameters

When instantiating a DAG object, several parameters are crucial:

Parameter Description Default
dag_id A unique identifier for the DAG. Required
description A human-readable description of the DAG. None
schedule The schedule on which the DAG runs. Can be a cron expression, a timedelta, or None for manual runs. None
start_date The date from which the DAG starts running. Timestamps before this date will not be run. Required
end_date The date after which the DAG will stop running. None
catchup If True, the DAG will run for all missed schedules between start_date and the current date. True
tags A list of strings to tag the DAG for better organization and filtering in the UI. None
default_args A dictionary of default parameters for all tasks within the DAG. None

Task Dependencies

Dependencies between tasks define the execution order. Airflow supports various ways to set these relationships.

Bitshift Operators

The most common way is using bitshift operators:

set_upstream and set_downstream methods

Alternatively, you can use explicit methods:


task1.set_downstream(task2)
# or
task2.set_upstream(task1)

task3.set_downstream([task4, task5])
            

Task Groups

Task groups provide a way to visually group related tasks in the Airflow UI, making complex DAGs easier to navigate.


from airflow.utils.task_group import TaskGroup

with DAG(...) as dag:
    start = DummyOperator(task_id='start')

    with TaskGroup("processing_section", tooltip="Tasks for processing data") as processing_group:
        task_a = BashOperator(task_id='task_a', bash_command='echo 1')
        task_b = BashOperator(task_id='task_b', bash_command='echo 2')
        task_a >> task_b

    end = DummyOperator(task_id='end')

    start >> processing_group >> end
            
Note: When using task groups, dependencies can be set between tasks within the group or between the group itself and other tasks.

DAG File Structure

Airflow DAGs are Python files placed in the dags folder defined in your Airflow configuration. Airflow automatically parses these files to discover and load your workflows.

Best Practices