Apache Airflow Documentation

Stable Release - How-To Guides

Using the PythonOperator

The PythonOperator is one of the most fundamental and flexible operators in Apache Airflow. It allows you to execute arbitrary Python code as a task within your DAG.

Basic Usage

To use the PythonOperator, you need to provide a Python function that will be executed when the task runs. This function can accept arguments, and these arguments can be passed from the DAG definition.


from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

def my_python_function(ti=None, **context):
    print("Hello from my Python function!")
    print(f"Execution date: {context['ds']}")
    # You can push information to XComs
    ti.xcom_push(key='return_value', value="This is a result")

with DAG(
    dag_id="python_operator_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["example", "python"],
) as dag:
    run_this = PythonOperator(
        task_id="run_my_python_function",
        python_callable=my_python_function,
    )
                

Passing Arguments to the Python Function

You can pass arguments to your Python callable using the op_args and op_kwargs parameters of the PythonOperator.

op_args: A list of positional arguments to pass to the callable.

op_kwargs: A dictionary of keyword arguments to pass to the callable.


def greet(name, greeting="Hello"):
    print(f"{greeting}, {name}!")

with DAG(
    dag_id="python_operator_args_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["example", "python", "args"],
) as dag:
    greet_task = PythonOperator(
        task_id="greet_person",
        python_callable=greet,
        op_args=["Alice"],
        op_kwargs={"greeting": "Hi"},
    )
                

Accessing Airflow Context

Airflow injects a context dictionary into your Python callable, which contains useful information about the task instance, DAG run, execution date, etc. You can access this context by defining a parameter named **context in your function signature, or by defining specific parameters like ti (task instance).

Common keys in the context dictionary:

  • dag: The DAG object.
  • dag_run: The DAG Run object.
  • ts: The logical date/time of the DAG run (datetime object).
  • ds: The logical date of the DAG run (YYYY-MM-DD string).
  • ti: The Task Instance object.
  • execution_date: The logical date/time (alias for ts).

Using XComs

The PythonOperator can push data to XComs (Cross-Communication) using the ti.xcom_push() method. This allows downstream tasks to retrieve the results of your Python function.

To pull data from XComs in another task, you can use the Jinja templating system or the XComsPullOperator.


def process_data(ti=None):
    # Retrieve data pushed from a previous task
    data = ti.xcom_pull(task_ids="previous_task_id")
    processed_result = data.upper()
    return processed_result

with DAG(
    dag_id="python_operator_xcom_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    schedule=None,
    tags=["example", "python", "xcom"],
) as dag:
    # Assume 'previous_task' is a PythonOperator that pushes data
    # previous_task = PythonOperator(
    #     task_id="previous_task",
    #     python_callable=lambda ti: ti.xcom_push(key='return_value', value="sample data"),
    # )

    process_task = PythonOperator(
        task_id="process_my_data",
        python_callable=process_data,
    )

    # previous_task >> process_task
                

Common Pitfalls and Best Practices

  • Idempotency: Ensure your Python functions are idempotent, meaning they can be run multiple times without changing the result beyond the initial application. This is crucial for Airflow's retry mechanisms.
  • Serialization: If pushing complex objects to XComs, consider how they will be serialized and deserialized. JSON-serializable objects are generally easier to handle.
  • Dependencies: Keep your Python functions concise and focused on a single purpose. For more complex logic, consider breaking it down into multiple smaller functions or tasks.
  • Error Handling: Implement robust error handling within your Python functions to gracefully manage failures.
  • Logging: Use Python's standard logging module within your functions for better traceability. Airflow captures stdout and stderr, but explicit logging is more structured.

Note

The PythonOperator executes your code within the Airflow worker environment. Ensure all necessary libraries are installed in that environment.

Tip

For tasks that involve significant computation or I/O, consider using dedicated operators like PythonVirtualenvOperator if you need a more isolated and controlled Python environment.