Operators

Operators are the core building blocks of Airflow. Each operator represents a single unit of work in a DAG. Airflow provides a rich set of built-in operators, and you can also create your own custom operators.

What is an Operator?

An operator defines a task. When a DAG runs, Airflow instantiates operators as Tasks. Tasks are what get executed. An operator is a Python class that inherits from airflow.models.BaseOperator. It has arguments that can be passed to it, and it has an execute method that does the actual work.

Key Concepts

Common Built-in Operators

Airflow includes a wide array of operators for various purposes. Here are a few examples:

BashOperator

Executes a bash command.


from airflow.operators.bash import BashOperator

task = BashOperator(
    task_id='run_hello_world',
    bash_command='echo "Hello, World!"',
)
            

PythonOperator

Calls an arbitrary Python function.


from airflow.operators.python import PythonOperator

def my_python_function():
    print("This is a Python function.")

task = PythonOperator(
    task_id='run_python_function',
    python_callable=my_python_function,
)
            

EmailOperator

Sends an email.


from airflow.operators.email import EmailOperator

task = EmailOperator(
    task_id='send_email_notification',
    to='recipient@example.com',
    subject='Airflow Notification',
    html_content='

Task completed successfully!

', )

Database Operators

Airflow integrates with numerous databases. Examples include:

These operators typically require a configured Airflow Connection to the database.

Creating Custom Operators

You can extend Airflow by creating your own operators to encapsulate specific logic or integrations with proprietary systems. To create a custom operator, you need to:

  1. Create a Python file in your Airflow plugins directory (e.g., plugins/my_operators.py).
  2. Define a class that inherits from airflow.models.BaseOperator.
  3. Implement the execute(self, context) method, which contains the core logic of your operator.

# plugins/my_operators.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):
    """
    A simple custom operator example.
    """
    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super(MyCustomOperator, self).__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        self.log.info(f"Executing MyCustomOperator with param: {self.my_param}")
        # Your custom logic here
        return "Operation completed"

# In your DAG file:
# from my_operators import MyCustomOperator
#
# custom_task = MyCustomOperator(
#     task_id='my_custom_task',
#     my_param='some_value',
# )
            
Tip: Always use hooks for interacting with external systems within your operators to manage connections and configurations cleanly.

Operator Parameters and Templating

Many operator parameters can be dynamically rendered using Jinja templating. This allows you to pass information like the DAG run's execution date, logical date, or even custom variables into your operator commands.


from airflow.operators.bash import BashOperator
from datetime import datetime

task = BashOperator(
    task_id='templated_command',
    bash_command='echo "Execution date: {{ ds }}"', # {{ ds }} is the logical date in YYYY-MM-DD format
    dag=DAG(
        dag_id='templated_dag',
        start_date=datetime(2023, 1, 1),
        schedule_interval='@daily',
    ),
)
            

Further Reading