Using the TaskFlow API

The TaskFlow API is a modern, Python-native way to author complex workflows in Apache Airflow. It allows you to define tasks as Python functions and lets Airflow handle the dependencies and data passing automatically.

What is the TaskFlow API?

Traditionally, Airflow DAGs were built by instantiating operators. The TaskFlow API streamlines this by enabling you to decorate Python functions, turning them into Airflow tasks. This makes your DAGs more readable, concise, and Pythonic.

Key Concepts

Getting Started with the @task Decorator

The @task decorator is the core of the TaskFlow API. It transforms a Python function into an Airflow task.


from __future__ import annotations

import pendulum

from airflow.decorators import dag, task

@dag(
    schedule="@daily",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["taskflow", "example"],
)
def taskflow_example_dag():
    @task
    def extract_data(**context):
        """
        Simulates extracting data from a source.
        """
        print("Extracting data...")
        data = {"key": "value", "count": 42}
        return data

    @task
    def transform_data(data: dict, **context) -> dict:
        """
        Simulates transforming the extracted data.
        """
        print(f"Transforming data: {data}")
        transformed = data.copy()
        transformed["processed"] = True
        return transformed

    @task
    def load_data(transformed_data: dict, **context):
        """
        Simulates loading the transformed data.
        """
        print(f"Loading data: {transformed_data}")

    extracted = extract_data()
    transformed = transform_data(extracted)
    load_data(transformed)

taskflow_example_dag()
            

Understanding Dependencies

In the example above:

Passing Arguments and Context

You can pass standard Python arguments to your decorated functions. Airflow also makes the task context available via the **context keyword argument, which includes information like execution date, task instance, etc.

Type Hinting for Clarity

Using Python's type hints enhances readability and helps Airflow understand the expected data types, especially when passing data between tasks.

Note: The TaskFlow API is generally recommended for new DAGs due to its ease of use and readability. For existing DAGs, you can gradually migrate parts of your workflow to use the TaskFlow API.

Advanced Usage

@task.branch for Branching Logic

The @task.branch decorator allows you to create tasks that decide the next path of execution in your DAG. The decorated function should return a task ID (or a list of task IDs) to execute.

@task.external_python for Remote Python Execution

Use @task.external_python when you need to run Python code in a separate process or environment, ensuring isolation and avoiding dependency conflicts.

@task.python (Legacy Alias)

@task.python is an alias for @task and can be used interchangeably.

Benefits of TaskFlow API

Tip: Experiment with the TaskFlow API on a small test DAG to get a feel for its capabilities. The automatic data passing and dependency inference are powerful features that can significantly speed up your development.

Conclusion

The TaskFlow API represents a significant evolution in Airflow DAG authoring. By embracing its Python-native approach, you can write more maintainable, readable, and efficient data pipelines.