Stable Release - How-To Guides
The PythonOperator is one of the most fundamental and flexible operators in Apache Airflow. It allows you to execute arbitrary Python code as a task within your DAG.
To use the PythonOperator, you need to provide a Python function that will be executed when the task runs. This function can accept arguments, and these arguments can be passed from the DAG definition.
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
def my_python_function(ti=None, **context):
print("Hello from my Python function!")
print(f"Execution date: {context['ds']}")
# You can push information to XComs
ti.xcom_push(key='return_value', value="This is a result")
with DAG(
dag_id="python_operator_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example", "python"],
) as dag:
run_this = PythonOperator(
task_id="run_my_python_function",
python_callable=my_python_function,
)
You can pass arguments to your Python callable using the op_args and op_kwargs parameters of the PythonOperator.
op_args: A list of positional arguments to pass to the callable.
op_kwargs: A dictionary of keyword arguments to pass to the callable.
def greet(name, greeting="Hello"):
print(f"{greeting}, {name}!")
with DAG(
dag_id="python_operator_args_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example", "python", "args"],
) as dag:
greet_task = PythonOperator(
task_id="greet_person",
python_callable=greet,
op_args=["Alice"],
op_kwargs={"greeting": "Hi"},
)
Airflow injects a context dictionary into your Python callable, which contains useful information about the task instance, DAG run, execution date, etc. You can access this context by defining a parameter named **context in your function signature, or by defining specific parameters like ti (task instance).
Common keys in the context dictionary:
dag: The DAG object.dag_run: The DAG Run object.ts: The logical date/time of the DAG run (datetime object).ds: The logical date of the DAG run (YYYY-MM-DD string).ti: The Task Instance object.execution_date: The logical date/time (alias for ts).The PythonOperator can push data to XComs (Cross-Communication) using the ti.xcom_push() method. This allows downstream tasks to retrieve the results of your Python function.
To pull data from XComs in another task, you can use the Jinja templating system or the XComsPullOperator.
def process_data(ti=None):
# Retrieve data pushed from a previous task
data = ti.xcom_pull(task_ids="previous_task_id")
processed_result = data.upper()
return processed_result
with DAG(
dag_id="python_operator_xcom_example",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
schedule=None,
tags=["example", "python", "xcom"],
) as dag:
# Assume 'previous_task' is a PythonOperator that pushes data
# previous_task = PythonOperator(
# task_id="previous_task",
# python_callable=lambda ti: ti.xcom_push(key='return_value', value="sample data"),
# )
process_task = PythonOperator(
task_id="process_my_data",
python_callable=process_data,
)
# previous_task >> process_task
The PythonOperator executes your code within the Airflow worker environment. Ensure all necessary libraries are installed in that environment.
For tasks that involve significant computation or I/O, consider using dedicated operators like PythonVirtualenvOperator if you need a more isolated and controlled Python environment.