SQL Operator
The SQLOperator allows you to execute SQL statements against a database. This is incredibly useful for performing data transformations, loading data, or running ad-hoc queries as part of your data pipelines.
Prerequisites
To use the SQLOperator, you need to have the appropriate Airflow provider package installed for your database. For example, for PostgreSQL, you would install apache-airflow-providers-postgres. You also need a configured Airflow connection for your database.
Basic Usage
The SQLOperator takes several key arguments:
task_id: A unique identifier for the task.conn_id: The Airflow connection ID for your database.sql: The SQL statement or a list of SQL statements to execute.database(optional): The specific database to connect to, if not defined in the connection.schema(optional): The specific schema to use.autocommit(optional, default:False): Whether to automatically commit the transaction.parameters(optional): A dictionary of parameters to substitute into the SQL statement.
Example DAG
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLOperator
with DAG(
dag_id="sql_operator_example",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "sql"],
) as dag:
create_table = SQLOperator(
task_id="create_table",
conn_id="my_database_conn", # Ensure you have a connection named 'my_database_conn'
sql="""
CREATE TABLE IF NOT EXISTS my_table (
id INT PRIMARY KEY,
name VARCHAR(255)
);
""",
)
insert_data = SQLOperator(
task_id="insert_data",
conn_id="my_database_conn",
sql="INSERT INTO my_table (id, name) VALUES (%s, %s);",
parameters={"id": 1, "name": "Example Data"},
)
# Example with multiple statements
cleanup_data = SQLOperator(
task_id="cleanup_data",
conn_id="my_database_conn",
sql=[
"DELETE FROM my_table WHERE id = 1;",
"DROP TABLE IF EXISTS my_table;",
],
autocommit=True, # Often useful for DDL statements
)
create_table >> insert_data >> cleanup_data
Executing Multiple SQL Statements
You can provide a list of SQL statements to the sql parameter. Airflow will execute them sequentially.
If autocommit is False (the default), all statements will be executed within a single transaction. If any statement fails, the entire transaction will be rolled back.
If autocommit is True, each statement will be committed individually. This is often preferred for Data Definition Language (DDL) statements like CREATE TABLE or DROP TABLE.
Using Parameters
The SQLOperator supports parameterized queries to prevent SQL injection and make your queries more dynamic.
The parameters argument should be a dictionary where keys are placeholder names (which vary depending on the database driver, but often match common SQL syntaxes like %s or :name) and values are the corresponding data.
update_record = SQLOperator(
task_id="update_record",
conn_id="my_database_conn",
sql="UPDATE my_table SET name = %(new_name)s WHERE id = %(record_id)s;",
parameters={"new_name": "Updated Name", "record_id": 1},
)
Tip
The exact syntax for placeholders (e.g., %s, ?, :name) depends on the underlying database driver used by your connection. Check your database's documentation or the Airflow provider documentation for specifics.
Returning Query Results
By default, the SQLOperator does not return the results of a SELECT query. If you need to capture the results, you should use operators designed for fetching data, such as SqlSensor (for checking conditions) or custom Python operators that execute queries and return values.
For advanced use cases or specific database interactions, consider using Airflow's Hook system directly within a PythonOperator.
Common Database Providers
Airflow supports a wide range of databases through provider packages. Some of the most common include:
| Database | Provider Package | Example conn_id Prefix |
|---|---|---|
| PostgreSQL | apache-airflow-providers-postgres |
postgres:// |
| MySQL | apache-airflow-providers-mysql |
mysql:// |
| SQLite | apache-airflow-providers-sqlite |
sqlite:// |
| Snowflake | apache-airflow-providers-snowflake |
snowflake:// |
| BigQuery | apache-airflow-providers-google |
google_cloud_platform:// |
| Redshift | apache-airflow-providers-amazon |
aws:// |
Important
Always refer to the official Airflow provider documentation for the most up-to-date information on installation, configuration, and available parameters for specific database integrations.