HDFS Integration

Apache Airflow provides robust integration with the Hadoop Distributed File System (HDFS). This allows you to manage data pipelines that interact with HDFS for storage and processing. This document outlines how to configure and use HDFS connections within Airflow.

Prerequisites

Before you can use HDFS integration, ensure you have the following:


pip install apache-airflow[hdfs]
# or for more advanced features with pyarrow
pip install apache-airflow[hdfs,pyarrow]
            

Configuring HDFS Connections

Airflow uses Connections to store credentials and endpoint information for external systems. To connect to HDFS, you need to configure an HDFS connection in the Airflow UI or via environment variables/configuration files.

Using the Airflow UI

  1. Navigate to Admin -> Connections in the Airflow UI.
  2. Click the + button to add a new connection.
  3. Set the Conn Id to a unique identifier (e.g., hdfs_default).
  4. Set the Conn Type to HDFS.
  5. Enter the Host of your HDFS NameNode (e.g., namenode.example.com).
  6. Enter the Port of your HDFS NameNode (default is 8020 or 9000 depending on your HDFS configuration).
  7. (Optional) If your HDFS uses Kerberos authentication, provide the Login (principal) and Password (keytab path or password).
  8. (Optional) Specify the Schema if needed (e.g., hdfs, webhdfs, s3a for S3-backed HDFS).
  9. Click Save.

Using Environment Variables

You can also set HDFS connection details using environment variables. The key format is AIRFLOW_CONN_[CONN_ID].

For example, to configure a connection with Conn Id set to hdfs_default:


export AIRFLOW_CONN_HDFS_DEFAULT='hdfs://namenode.example.com:8020/'
            

For more complex configurations (e.g., Kerberos):


export AIRFLOW_CONN_HDFS_DEFAULT='hdfs://namenode.example.com:8020/?user=airflow&kerberos_principal=airflow@EXAMPLE.COM&kerberos_keytab=/path/to/airflow.keytab'
            

Using HDFS Operators and Hooks

Airflow provides several operators and hooks to interact with HDFS:

HdfsSensor

This sensor waits for a file or directory to exist in HDFS.


from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor
from airflow.models import DAG
from datetime import datetime

with DAG(
    dag_id='hdfs_sensor_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    wait_for_file = HdfsSensor(
        task_id='wait_for_data_file',
        filepath='/user/airflow/data/input.csv',
        hdfs_conn_id='hdfs_default',
    )
            

HdfsHook

The HdfsHook provides a programmatic interface to interact with HDFS. You can use it within PythonOperators.


from airflow.providers.apache.hdfs.hooks.hdfs import HdfsHook
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime

def list_hdfs_directory(**context):
    hdfs_hook = HdfsHook(hdfs_conn_id='hdfs_default')
    directory_content = hdfs_hook.get_path_info('/user/airflow/data/')
    print(f"Directory content: {directory_content}")

with DAG(
    dag_id='hdfs_hook_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    list_dir_task = PythonOperator(
        task_id='list_hdfs_directory',
        python_callable=list_hdfs_directory,
    )
            

HdfsSensor.FileExistsSensor

A more specific sensor for checking file existence.

HdfsSensor.PathDoesNotExistSensor

Waits for a specified path to be deleted.

Common HDFS Operations

You can perform various file operations like:

Refer to the specific operator or hook documentation for detailed examples and available parameters.

Note: Ensure your HDFS connection details are correct and that Airflow has the necessary network access to your NameNode and DataNodes.
Tip: For HDFS operations involving large data volumes, consider using operators that support HDFS streaming or distributed processing frameworks like Spark, which can be integrated with Airflow.

Troubleshooting

If you encounter issues, check the following: