HDFS Integration
Apache Airflow provides robust integration with the Hadoop Distributed File System (HDFS). This allows you to manage data pipelines that interact with HDFS for storage and processing. This document outlines how to configure and use HDFS connections within Airflow.
Prerequisites
Before you can use HDFS integration, ensure you have the following:
- A running HDFS cluster.
- The necessary HDFS client libraries installed on your Airflow worker nodes (or the Airflow environment).
- Python libraries for HDFS interaction, such as
hdfsorpyarrow. You can install them using pip:
pip install apache-airflow[hdfs]
# or for more advanced features with pyarrow
pip install apache-airflow[hdfs,pyarrow]
Configuring HDFS Connections
Airflow uses Connections to store credentials and endpoint information for external systems. To connect to HDFS, you need to configure an HDFS connection in the Airflow UI or via environment variables/configuration files.
Using the Airflow UI
- Navigate to Admin -> Connections in the Airflow UI.
- Click the + button to add a new connection.
- Set the Conn Id to a unique identifier (e.g.,
hdfs_default). - Set the Conn Type to
HDFS. - Enter the Host of your HDFS NameNode (e.g.,
namenode.example.com). - Enter the Port of your HDFS NameNode (default is
8020or9000depending on your HDFS configuration). - (Optional) If your HDFS uses Kerberos authentication, provide the Login (principal) and Password (keytab path or password).
- (Optional) Specify the Schema if needed (e.g.,
hdfs,webhdfs,s3afor S3-backed HDFS). - Click Save.
Using Environment Variables
You can also set HDFS connection details using environment variables. The key format is AIRFLOW_CONN_[CONN_ID].
For example, to configure a connection with Conn Id set to hdfs_default:
export AIRFLOW_CONN_HDFS_DEFAULT='hdfs://namenode.example.com:8020/'
For more complex configurations (e.g., Kerberos):
export AIRFLOW_CONN_HDFS_DEFAULT='hdfs://namenode.example.com:8020/?user=airflow&kerberos_principal=airflow@EXAMPLE.COM&kerberos_keytab=/path/to/airflow.keytab'
Using HDFS Operators and Hooks
Airflow provides several operators and hooks to interact with HDFS:
HdfsSensor
This sensor waits for a file or directory to exist in HDFS.
from airflow.providers.apache.hdfs.sensors.hdfs import HdfsSensor
from airflow.models import DAG
from datetime import datetime
with DAG(
dag_id='hdfs_sensor_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
wait_for_file = HdfsSensor(
task_id='wait_for_data_file',
filepath='/user/airflow/data/input.csv',
hdfs_conn_id='hdfs_default',
)
HdfsHook
The HdfsHook provides a programmatic interface to interact with HDFS. You can use it within PythonOperators.
from airflow.providers.apache.hdfs.hooks.hdfs import HdfsHook
from airflow.operators.python import PythonOperator
from airflow.models import DAG
from datetime import datetime
def list_hdfs_directory(**context):
hdfs_hook = HdfsHook(hdfs_conn_id='hdfs_default')
directory_content = hdfs_hook.get_path_info('/user/airflow/data/')
print(f"Directory content: {directory_content}")
with DAG(
dag_id='hdfs_hook_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
list_dir_task = PythonOperator(
task_id='list_hdfs_directory',
python_callable=list_hdfs_directory,
)
HdfsSensor.FileExistsSensor
A more specific sensor for checking file existence.
HdfsSensor.PathDoesNotExistSensor
Waits for a specified path to be deleted.
Common HDFS Operations
You can perform various file operations like:
- Creating directories
- Uploading and downloading files
- Reading file contents
- Deleting files and directories
- Checking file/directory existence and status
Refer to the specific operator or hook documentation for detailed examples and available parameters.
Troubleshooting
If you encounter issues, check the following:
- Connection Configuration: Verify the HDFS host, port, and any authentication details in your Airflow connection.
- Network Connectivity: Ensure Airflow workers can reach the HDFS NameNode and DataNodes.
- Permissions: Check if the user specified in the Airflow connection has the necessary permissions in HDFS.
- Logs: Examine the Airflow task logs for detailed error messages.