The PythonOperator

The PythonOperator is a fundamental operator in Apache Airflow that allows you to execute arbitrary Python code as a task within your DAGs. It's incredibly versatile and commonly used for custom logic, data manipulation, and integrating with various Python libraries.

Basic Usage

To use the PythonOperator, you need to provide a Python function that will be executed when the task runs. This function can accept arguments passed through the operator.

Example DAG


from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator

def greet_world():
    print("Hello, World from Airflow!")

def greet_person(name: str):
    print(f"Hello, {name} from Airflow!")

with DAG(
    dag_id="python_operator_example",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "python"],
) as dag:
    task_greet_world = PythonOperator(
        task_id="greet_world",
        python_callable=greet_world,
    )

    task_greet_person = PythonOperator(
        task_id="greet_person",
        python_callable=greet_person,
        op_kwargs={"name": "Airflow User"},
    )

    task_greet_world >> task_greet_person
            

Operator Parameters

The PythonOperator accepts several key parameters:

Parameter Description Default
task_id A unique identifier for the task within the DAG. Required
python_callable The Python function to be executed. This can be a function defined locally or imported from another module. Required
op_args A list of positional arguments to pass to the python_callable. None
op_kwargs A dictionary of keyword arguments to pass to the python_callable. None
templates_dict A dictionary of templates that will be rendered and passed to the Python callable as keyword arguments. Keys in this dictionary will become keyword arguments to the callable. None
provide_context If True, the task’s context (like ds, execution_date, dag, etc.) will be passed as keyword arguments to the python_callable. False
execution_timeout Timeout for the task to complete. None
trigger_rule Defines the conditions under which the task will run. 'all_success'
retries Number of retries for the task. 0
retry_delay Delay between retries. timedelta(minutes=5)

Passing Arguments

You can pass arguments to your Python function using op_args for positional arguments and op_kwargs for keyword arguments.

Example with Arguments


def multiply(x, y):
    return x * y

task_multiply = PythonOperator(
    task_id="multiply_numbers",
    python_callable=multiply,
    op_args=[5, 7],  # Corresponds to x and y
)

def add_with_names(a, b, message="Result:"):
    print(f"{message} {a + b}")

task_add = PythonOperator(
    task_id="add_numbers_with_message",
    python_callable=add_with_names,
    op_kwargs={"a": 10, "b": 20, "message": "The sum is:"},
)
            

Providing Context

By setting provide_context=True, Airflow will inject several useful variables into your Python function. These include:

Example with Context


from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
import pendulum

def log_execution_date(**context):
    execution_date = context["ds"]
    print(f"This task is running for the execution date: {execution_date}")

with DAG(
    dag_id="context_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule=None,
    catchup=False,
) as dag:
    task_log_date = PythonOperator(
        task_id="log_date",
        python_callable=log_execution_date,
        provide_context=True,
    )
            

Templating

The PythonOperator supports Jinja templating for arguments passed via op_kwargs and op_args. This allows you to dynamically set parameters based on the DAG run's context.

Example with Templating


from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
import pendulum

def process_data(file_prefix: str, execution_date_str: str):
    print(f"Processing files starting with '{file_prefix}' for date {execution_date_str}")

with DAG(
    dag_id="templated_python_example",
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
) as dag:
    task_process = PythonOperator(
        task_id="process_files",
        python_callable=process_data,
        op_kwargs={
            "file_prefix": "data_",
            "execution_date_str": "{{ ds }}"  # Jinja template for execution date
        },
    )
            

Customizing the Python Environment

When the PythonOperator runs, it executes within the Python environment where your Airflow worker is running. For more complex scenarios or to ensure specific dependencies, consider:

Important: Keep your Python callables lean and focused. For complex data processing, consider offloading the heavy lifting to external services or specialized operators.

Best Practices

The PythonOperator is a powerful tool for extending Airflow's capabilities. By understanding its parameters and best practices, you can effectively integrate custom Python logic into your data pipelines.