Creating Custom Hooks

Hooks are interfaces to external systems. Airflow provides built-in hooks for many popular services. However, you might need to interact with a system for which no hook is readily available. This guide explains how to create your own custom hook.

What is a Hook?

A hook is a class that abstracts away the details of connecting to an external service. It typically contains methods for performing common operations on that service. For example, a database hook might have methods to execute SQL queries, and an HTTP hook might have methods to send GET or POST requests.

Why Create a Custom Hook?

  • Integration with New Systems: Connect Airflow to services not covered by default hooks.
  • Custom Logic: Encapsulate complex connection or interaction logic for a specific system.
  • Reusability: Make your custom integrations easily shareable and reusable across different DAGs.
  • Maintainability: Centralize external system interactions, simplifying updates and debugging.

Steps to Create a Custom Hook

1. Define Your Hook Class

Custom hooks should inherit from airflow.hooks.base.BaseHook. You'll typically want to override the __init__ method and potentially other methods like get_conn and methods for specific operations.

Here's a basic structure:


from airflow.hooks.base import BaseHook
from typing import Optional

class MyCustomHook(BaseHook):
    """
    A custom hook to interact with My Awesome Service.
    """
    conn_name_attr = 'my_custom_conn_id'  # Attribute to store the connection ID
    default_conn_name = 'my_custom_default' # Default connection ID if not specified

    def __init__(self,
                 my_custom_param: str = 'default_value',
                 *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.my_custom_param = my_custom_param
        self.connection = None  # To store the connection object

    def get_conn(self) -> object:
        """
        Establishes a connection to the external service.
        """
        if self.connection is None:
            # Retrieve connection details from Airflow connections
            conn = self.get_connection(self.my_custom_conn_id)

            # Use connection details to establish the actual connection
            # Example: using requests library
            import requests
            self.connection = requests.Session()
            self.connection.headers.update({'X-API-Key': conn.password})
            self.connection.auth = (conn.login, conn.password)
            self.log.info(f"Connected to My Awesome Service using host: {conn.host}")
        return self.connection

    def my_custom_api_call(self, endpoint: str, method: str = 'GET', **kwargs) -> dict:
        """
        Performs a custom API call to the service.
        """
        conn = self.get_conn()
        url = f"{self.get_connection(self.my_custom_conn_id).host}/{endpoint}"
        response = conn.request(method, url, **kwargs)
        response.raise_for_status() # Raise an exception for bad status codes
        return response.json()

    def close(self) -> None:
        """
        Closes the connection if it's open.
        """
        if self.connection:
            self.log.info("Closing connection to My Awesome Service.")
            # Perform any necessary cleanup here
            # For requests.Session, there isn't an explicit close, but you could
            # manage underlying resources if needed.
            self.connection = None

# Example of using the hook in an operator
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def _call_my_service():
    hook = MyCustomHook(my_custom_param='some_other_value', my_custom_conn_id='my_awesome_service_conn')
    try:
        data = hook.my_custom_api_call('users', params={'active': True})
        print(f"Received data: {data}")
    finally:
        hook.close() # Ensure the connection is closed

with DAG(
    dag_id='custom_hook_example',
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['example', 'custom_hook'],
) as dag:
    call_service_task = PythonOperator(
        task_id='call_my_service_task',
        python_callable=_call_my_service,
    )
                

2. Register Your Hook

Airflow discovers hooks by scanning directories specified in the plugins_folder Airflow configuration or by placing them in the plugins directory of your Airflow project.

Create a directory structure like this:


airflow_home/
├── dags/
│   └── my_dag.py
└── plugins/
    └── my_custom_plugin/
        ├── __init__.py
        └── hooks/
            ├── __init__.py
            └── my_custom_hook.py
                

In plugins/__init__.py, you might expose your hook:


from airflow.plugins_manager import AirflowPlugin
from my_custom_plugin.hooks.my_custom_hook import MyCustomHook

class MyCustomPlugin(AirflowPlugin):
    name = "my_custom_plugin"
    hooks = [MyCustomHook]
    operators = []
    sensors = []
    executors = []
    admin_views = []
    flask_blueprints = []
    menu_links = []
    appbuilder_views = []
    appbuilder_menu_items = []
    global_operator_args = {}
    on_startup = []
    on_import_errors = []
                

Ensure your airflow.cfg has:


[core]
plugins_folder = /path/to/your/airflow_home/plugins
                

Or simply place your hook file within the plugins/hooks/ directory. Airflow will automatically discover it if the plugins_folder is configured correctly.

3. Configure Connections

You need to create an Airflow Connection in the UI (Admin -> Connections) or via the CLI that your hook can reference. The connection details should match what your hook expects.

  • Conn Id: my_awesome_service_conn (or whatever you set my_custom_conn_id to)
  • Conn Type: Custom (or HTTP if applicable and you adapt your hook)
  • Host: The base URL of your service (e.g., https://api.example.com)
  • Login: Your API username or token
  • Password: Your API password or secret
  • Extra: You can use the 'Extra' field for additional parameters, often as a JSON string.

Note: Sensitive information like API keys should ideally be stored in the Login or Password fields, or securely managed through other means.

Key Considerations for Custom Hooks

  • Error Handling: Implement robust error handling within your hook methods. Use try...except blocks to catch potential issues with the external service and log them appropriately.
  • Connection Management: Ensure connections are properly opened and closed. The get_conn method should ideally be idempotent (calling it multiple times doesn't create new connections). Implement a close method to release resources.
  • Idempotency: Design your hook methods to be idempotent where possible, especially if they perform state-changing operations. This helps prevent unintended side effects if a task is retried.
  • Logging: Use Airflow's logger (self.log) to provide informative messages about the hook's operations.
  • Security: Be mindful of how you handle credentials. Avoid hardcoding sensitive information directly in the hook code.
  • Dependencies: If your hook relies on external Python libraries (e.g., requests, boto3), ensure these libraries are installed in your Airflow environment.

Tip: For HTTP-based services, consider inheriting from airflow.providers.http.hooks.http.HttpHook as it provides a good foundation for making HTTP requests.

Warning: Ensure your custom hook is well-tested before deploying it to production. Incorrectly implemented hooks can lead to data loss or pipeline failures.