Creating Custom Hooks
Hooks are interfaces to external systems. Airflow provides built-in hooks for many popular services. However, you might need to interact with a system for which no hook is readily available. This guide explains how to create your own custom hook.
What is a Hook?
A hook is a class that abstracts away the details of connecting to an external service. It typically contains methods for performing common operations on that service. For example, a database hook might have methods to execute SQL queries, and an HTTP hook might have methods to send GET or POST requests.
Why Create a Custom Hook?
- Integration with New Systems: Connect Airflow to services not covered by default hooks.
- Custom Logic: Encapsulate complex connection or interaction logic for a specific system.
- Reusability: Make your custom integrations easily shareable and reusable across different DAGs.
- Maintainability: Centralize external system interactions, simplifying updates and debugging.
Steps to Create a Custom Hook
1. Define Your Hook Class
Custom hooks should inherit from airflow.hooks.base.BaseHook. You'll typically want to override the __init__ method and potentially other methods like get_conn and methods for specific operations.
Here's a basic structure:
from airflow.hooks.base import BaseHook
from typing import Optional
class MyCustomHook(BaseHook):
"""
A custom hook to interact with My Awesome Service.
"""
conn_name_attr = 'my_custom_conn_id' # Attribute to store the connection ID
default_conn_name = 'my_custom_default' # Default connection ID if not specified
def __init__(self,
my_custom_param: str = 'default_value',
*args, **kwargs):
super().__init__(*args, **kwargs)
self.my_custom_param = my_custom_param
self.connection = None # To store the connection object
def get_conn(self) -> object:
"""
Establishes a connection to the external service.
"""
if self.connection is None:
# Retrieve connection details from Airflow connections
conn = self.get_connection(self.my_custom_conn_id)
# Use connection details to establish the actual connection
# Example: using requests library
import requests
self.connection = requests.Session()
self.connection.headers.update({'X-API-Key': conn.password})
self.connection.auth = (conn.login, conn.password)
self.log.info(f"Connected to My Awesome Service using host: {conn.host}")
return self.connection
def my_custom_api_call(self, endpoint: str, method: str = 'GET', **kwargs) -> dict:
"""
Performs a custom API call to the service.
"""
conn = self.get_conn()
url = f"{self.get_connection(self.my_custom_conn_id).host}/{endpoint}"
response = conn.request(method, url, **kwargs)
response.raise_for_status() # Raise an exception for bad status codes
return response.json()
def close(self) -> None:
"""
Closes the connection if it's open.
"""
if self.connection:
self.log.info("Closing connection to My Awesome Service.")
# Perform any necessary cleanup here
# For requests.Session, there isn't an explicit close, but you could
# manage underlying resources if needed.
self.connection = None
# Example of using the hook in an operator
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def _call_my_service():
hook = MyCustomHook(my_custom_param='some_other_value', my_custom_conn_id='my_awesome_service_conn')
try:
data = hook.my_custom_api_call('users', params={'active': True})
print(f"Received data: {data}")
finally:
hook.close() # Ensure the connection is closed
with DAG(
dag_id='custom_hook_example',
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=['example', 'custom_hook'],
) as dag:
call_service_task = PythonOperator(
task_id='call_my_service_task',
python_callable=_call_my_service,
)
2. Register Your Hook
Airflow discovers hooks by scanning directories specified in the plugins_folder Airflow configuration or by placing them in the plugins directory of your Airflow project.
Create a directory structure like this:
airflow_home/
├── dags/
│ └── my_dag.py
└── plugins/
└── my_custom_plugin/
├── __init__.py
└── hooks/
├── __init__.py
└── my_custom_hook.py
In plugins/__init__.py, you might expose your hook:
from airflow.plugins_manager import AirflowPlugin
from my_custom_plugin.hooks.my_custom_hook import MyCustomHook
class MyCustomPlugin(AirflowPlugin):
name = "my_custom_plugin"
hooks = [MyCustomHook]
operators = []
sensors = []
executors = []
admin_views = []
flask_blueprints = []
menu_links = []
appbuilder_views = []
appbuilder_menu_items = []
global_operator_args = {}
on_startup = []
on_import_errors = []
Ensure your airflow.cfg has:
[core]
plugins_folder = /path/to/your/airflow_home/plugins
Or simply place your hook file within the plugins/hooks/ directory. Airflow will automatically discover it if the plugins_folder is configured correctly.
3. Configure Connections
You need to create an Airflow Connection in the UI (Admin -> Connections) or via the CLI that your hook can reference. The connection details should match what your hook expects.
- Conn Id:
my_awesome_service_conn(or whatever you setmy_custom_conn_idto) - Conn Type: Custom (or HTTP if applicable and you adapt your hook)
- Host: The base URL of your service (e.g.,
https://api.example.com) - Login: Your API username or token
- Password: Your API password or secret
- Extra: You can use the 'Extra' field for additional parameters, often as a JSON string.
Note: Sensitive information like API keys should ideally be stored in the Login or Password fields, or securely managed through other means.
Key Considerations for Custom Hooks
- Error Handling: Implement robust error handling within your hook methods. Use
try...exceptblocks to catch potential issues with the external service and log them appropriately. - Connection Management: Ensure connections are properly opened and closed. The
get_connmethod should ideally be idempotent (calling it multiple times doesn't create new connections). Implement aclosemethod to release resources. - Idempotency: Design your hook methods to be idempotent where possible, especially if they perform state-changing operations. This helps prevent unintended side effects if a task is retried.
- Logging: Use Airflow's logger (
self.log) to provide informative messages about the hook's operations. - Security: Be mindful of how you handle credentials. Avoid hardcoding sensitive information directly in the hook code.
- Dependencies: If your hook relies on external Python libraries (e.g.,
requests,boto3), ensure these libraries are installed in your Airflow environment.
Tip: For HTTP-based services, consider inheriting from airflow.providers.http.hooks.http.HttpHook as it provides a good foundation for making HTTP requests.
Warning: Ensure your custom hook is well-tested before deploying it to production. Incorrectly implemented hooks can lead to data loss or pipeline failures.