DAGs
A Directed Acyclic Graph (DAG) is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. DAGs are the core of Airflow. They define the workflows you want to automate.
What is a DAG?
In Airflow, a DAG is a Python script that defines a workflow. This script describes the tasks in your workflow, how they should be scheduled, and the dependencies between them. The "directed" aspect means that data flows in one direction, and "acyclic" means that the graph cannot contain cycles, preventing infinite loops.
Key Components of a DAG
- Tasks: The basic units of work in an Airflow workflow. Each task represents a single unit of work.
- Operators: Templates for a type of work to be performed. Operators are the building blocks of tasks. Airflow provides many built-in operators, and you can also create your own.
- Dependencies: The relationships between tasks, defining the order in which they should run.
- Schedule: How often the DAG should run.
Creating a DAG
DAGs are defined in Python files, typically placed in the Airflow DAGs folder. Here's a simple example:
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_first_dag",
schedule="@daily",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "first"],
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> end
In this example:
dag_idis a unique identifier for the DAG.scheduledefines how often the DAG runs (e.g.,@daily,@hourly, or a cron expression).start_dateis the date from which the DAG will start running.catchup=Falseprevents the DAG from running for past missed schedules.tagsare used for organizing and filtering DAGs in the UI.EmptyOperatoris a simple operator that does nothing, useful for defining task dependencies.- The
>>operator defines a dependency, meaningendwill run afterstartcompletes successfully.
Task Dependencies
You can define dependencies between tasks in several ways:
- Bitshift operators:
task1 << task2(task2 runs after task1) ortask1 >> task2(task1 runs after task2). set_upstreamandset_downstreammethods:task2.set_upstream(task1)ortask1.set_downstream(task2).- Chaining:
task1 << task2 << task3.
Example with multiple dependencies:
from airflow.operators.bash import BashOperator
with DAG(
dag_id="complex_dependencies",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
) as dag:
task_a = BashOperator(task_id="task_a", bash_command="echo 'A'")
task_b = BashOperator(task_id="task_b", bash_command="echo 'B'")
task_c = BashOperator(task_id="task_c", bash_command="echo 'C'")
task_d = BashOperator(task_id="task_d", bash_command="echo 'D'")
task_a >> [task_b, task_c] >> task_d
In this example, task_b and task_c will run after task_a, and task_d will run after both task_b and task_c have completed successfully.
Scheduling DAGs
Airflow offers flexible scheduling options:
- Preset intervals:
@hourly,@daily,@weekly,@monthly,@yearly. - Cron expressions: A standard way to define complex schedules (e.g.,
0 0 * * *for midnight daily). - Manual runs: You can trigger DAGs manually from the Airflow UI.
- External triggers: DAGs can be triggered by external events or other DAGs.
DAG Runs and Task Instances
When a DAG is scheduled to run, it creates a DAG Run. Each time a specific task within a DAG Run executes, it's called a Task Instance. The state of DAG Runs and Task Instances (e.g., running, success, failed, skipped) can be monitored in the Airflow UI.