DAGs (Directed Acyclic Graphs)
DAGs are the core of Airflow. A DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. Airflow Python SDK allows you to define these workflows in Python code.
Defining a DAG
A DAG is instantiated using the DAG class. You typically define tasks within the context of a DAG object using a with statement.
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id='my_first_dag',
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=['example', 'beginner'],
) as dag:
# Define tasks here
task1 = BashOperator(
task_id='hello_world',
bash_command='echo "Hello, world!"',
)
task2 = BashOperator(
task_id='date',
bash_command='date',
)
# Define task dependencies
task1 >> task2
Key DAG Parameters
When instantiating a DAG object, several parameters are crucial:
| Parameter | Description | Default |
|---|---|---|
dag_id |
A unique identifier for the DAG. | Required |
description |
A human-readable description of the DAG. | None |
schedule |
The schedule on which the DAG runs. Can be a cron expression, a timedelta, or None for manual runs. |
None |
start_date |
The date from which the DAG starts running. Timestamps before this date will not be run. | Required |
end_date |
The date after which the DAG will stop running. | None |
catchup |
If True, the DAG will run for all missed schedules between start_date and the current date. |
True |
tags |
A list of strings to tag the DAG for better organization and filtering in the UI. | None |
default_args |
A dictionary of default parameters for all tasks within the DAG. | None |
Task Dependencies
Dependencies between tasks define the execution order. Airflow supports various ways to set these relationships.
Bitshift Operators
The most common way is using bitshift operators:
task1 << task2:task1must complete successfully beforetask2can start.task1 >> task2: Same as above.task1 >> [task2, task3]:task1must complete before bothtask2andtask3can start.[task1, task2] >> task3: Bothtask1andtask2must complete beforetask3can start.task1 >> task2 >> task3: A chain of dependencies.
set_upstream and set_downstream methods
Alternatively, you can use explicit methods:
task1.set_downstream(task2)
# or
task2.set_upstream(task1)
task3.set_downstream([task4, task5])
Task Groups
Task groups provide a way to visually group related tasks in the Airflow UI, making complex DAGs easier to navigate.
from airflow.utils.task_group import TaskGroup
with DAG(...) as dag:
start = DummyOperator(task_id='start')
with TaskGroup("processing_section", tooltip="Tasks for processing data") as processing_group:
task_a = BashOperator(task_id='task_a', bash_command='echo 1')
task_b = BashOperator(task_id='task_b', bash_command='echo 2')
task_a >> task_b
end = DummyOperator(task_id='end')
start >> processing_group >> end
DAG File Structure
Airflow DAGs are Python files placed in the dags folder defined in your Airflow configuration. Airflow automatically parses these files to discover and load your workflows.
Best Practices
- Idempotency: Ensure your tasks can be rerun without side effects.
- Modularity: Break down complex workflows into smaller, reusable tasks.
- Clear Naming: Use descriptive
dag_idandtask_ids. - Tagging: Utilize tags for better organization and filtering.
- Documentation: Add descriptions to DAGs and tasks.
- Start Date: Set a sensible
start_dateand considercatchup=Falsefor most cases.