SQL Operator

The SQLOperator allows you to execute SQL statements against a database. This is incredibly useful for performing data transformations, loading data, or running ad-hoc queries as part of your data pipelines.

Prerequisites

To use the SQLOperator, you need to have the appropriate Airflow provider package installed for your database. For example, for PostgreSQL, you would install apache-airflow-providers-postgres. You also need a configured Airflow connection for your database.

Basic Usage

The SQLOperator takes several key arguments:

Example DAG


from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.providers.common.sql.operators.sql import SQLOperator

with DAG(
    dag_id="sql_operator_example",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "sql"],
) as dag:
    create_table = SQLOperator(
        task_id="create_table",
        conn_id="my_database_conn",  # Ensure you have a connection named 'my_database_conn'
        sql="""
        CREATE TABLE IF NOT EXISTS my_table (
            id INT PRIMARY KEY,
            name VARCHAR(255)
        );
        """,
    )

    insert_data = SQLOperator(
        task_id="insert_data",
        conn_id="my_database_conn",
        sql="INSERT INTO my_table (id, name) VALUES (%s, %s);",
        parameters={"id": 1, "name": "Example Data"},
    )

    # Example with multiple statements
    cleanup_data = SQLOperator(
        task_id="cleanup_data",
        conn_id="my_database_conn",
        sql=[
            "DELETE FROM my_table WHERE id = 1;",
            "DROP TABLE IF EXISTS my_table;",
        ],
        autocommit=True, # Often useful for DDL statements
    )

    create_table >> insert_data >> cleanup_data
            

Executing Multiple SQL Statements

You can provide a list of SQL statements to the sql parameter. Airflow will execute them sequentially.

If autocommit is False (the default), all statements will be executed within a single transaction. If any statement fails, the entire transaction will be rolled back.

If autocommit is True, each statement will be committed individually. This is often preferred for Data Definition Language (DDL) statements like CREATE TABLE or DROP TABLE.

Using Parameters

The SQLOperator supports parameterized queries to prevent SQL injection and make your queries more dynamic.

The parameters argument should be a dictionary where keys are placeholder names (which vary depending on the database driver, but often match common SQL syntaxes like %s or :name) and values are the corresponding data.


    update_record = SQLOperator(
        task_id="update_record",
        conn_id="my_database_conn",
        sql="UPDATE my_table SET name = %(new_name)s WHERE id = %(record_id)s;",
        parameters={"new_name": "Updated Name", "record_id": 1},
    )
            

Tip

The exact syntax for placeholders (e.g., %s, ?, :name) depends on the underlying database driver used by your connection. Check your database's documentation or the Airflow provider documentation for specifics.

Returning Query Results

By default, the SQLOperator does not return the results of a SELECT query. If you need to capture the results, you should use operators designed for fetching data, such as SqlSensor (for checking conditions) or custom Python operators that execute queries and return values.

For advanced use cases or specific database interactions, consider using Airflow's Hook system directly within a PythonOperator.

Common Database Providers

Airflow supports a wide range of databases through provider packages. Some of the most common include:

Database Provider Package Example conn_id Prefix
PostgreSQL apache-airflow-providers-postgres postgres://
MySQL apache-airflow-providers-mysql mysql://
SQLite apache-airflow-providers-sqlite sqlite://
Snowflake apache-airflow-providers-snowflake snowflake://
BigQuery apache-airflow-providers-google google_cloud_platform://
Redshift apache-airflow-providers-amazon aws://

Important

Always refer to the official Airflow provider documentation for the most up-to-date information on installation, configuration, and available parameters for specific database integrations.