Operators
Operators are the core building blocks of Airflow. Each operator represents a single unit of work in a DAG. Airflow provides a rich set of built-in operators, and you can also create your own custom operators.
What is an Operator?
An operator defines a task. When a DAG runs, Airflow instantiates operators as Tasks. Tasks are what get executed. An operator is a Python class that inherits from airflow.models.BaseOperator. It has arguments that can be passed to it, and it has an execute method that does the actual work.
Key Concepts
BaseOperator: The abstract base class for all operators.BaseHook: Operators often interact with external systems via Hooks.- Parameters: Operators can be configured with various parameters (e.g.,
bash_commandforBashOperator,sqlforPostgresOperator). - Templating: Many operator parameters support Jinja templating, allowing for dynamic execution based on DAG run context.
Common Built-in Operators
Airflow includes a wide array of operators for various purposes. Here are a few examples:
BashOperator
Executes a bash command.
from airflow.operators.bash import BashOperator
task = BashOperator(
task_id='run_hello_world',
bash_command='echo "Hello, World!"',
)
PythonOperator
Calls an arbitrary Python function.
from airflow.operators.python import PythonOperator
def my_python_function():
print("This is a Python function.")
task = PythonOperator(
task_id='run_python_function',
python_callable=my_python_function,
)
EmailOperator
Sends an email.
from airflow.operators.email import EmailOperator
task = EmailOperator(
task_id='send_email_notification',
to='recipient@example.com',
subject='Airflow Notification',
html_content='Task completed successfully!
',
)
Database Operators
Airflow integrates with numerous databases. Examples include:
PostgresOperatorMySqlOperatorSqliteOperatorBigQueryOperatorSnowflakeOperator
These operators typically require a configured Airflow Connection to the database.
Creating Custom Operators
You can extend Airflow by creating your own operators to encapsulate specific logic or integrations with proprietary systems. To create a custom operator, you need to:
- Create a Python file in your Airflow plugins directory (e.g.,
plugins/my_operators.py). - Define a class that inherits from
airflow.models.BaseOperator. - Implement the
execute(self, context)method, which contains the core logic of your operator.
# plugins/my_operators.py
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyCustomOperator(BaseOperator):
"""
A simple custom operator example.
"""
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super(MyCustomOperator, self).__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
self.log.info(f"Executing MyCustomOperator with param: {self.my_param}")
# Your custom logic here
return "Operation completed"
# In your DAG file:
# from my_operators import MyCustomOperator
#
# custom_task = MyCustomOperator(
# task_id='my_custom_task',
# my_param='some_value',
# )
Operator Parameters and Templating
Many operator parameters can be dynamically rendered using Jinja templating. This allows you to pass information like the DAG run's execution date, logical date, or even custom variables into your operator commands.
from airflow.operators.bash import BashOperator
from datetime import datetime
task = BashOperator(
task_id='templated_command',
bash_command='echo "Execution date: {{ ds }}"', # {{ ds }} is the logical date in YYYY-MM-DD format
dag=DAG(
dag_id='templated_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
),
)