Pools
This document explains how to manage and use Pools in Apache Airflow.
What are Pools?
Pools are a feature in Apache Airflow that allow you to limit the number of concurrent tasks that can run on a particular resource. This is useful for managing external systems or databases that have a limited capacity for concurrent connections or operations.
For example, if you have a database that can only handle 10 concurrent connections, you can create a pool with a size of 10 and assign all tasks that interact with that database to that pool. Airflow will then ensure that no more than 10 tasks assigned to that pool run simultaneously.
Managing Pools
You can manage pools through the Airflow UI. Navigate to the Admin tab and then select Pools from the dropdown menu.
Creating a New Pool
To create a new pool:
- Click the + Pool button.
- Enter a Name for your pool. This name will be used to reference the pool in your DAGs.
- Enter the Slots, which is the maximum number of concurrent tasks allowed in this pool.
- Enter a Description to help you remember the purpose of the pool.
- Click Save.
Editing an Existing Pool
To edit an existing pool, click the Edit button next to the pool you wish to modify in the Pools list. You can then change the name, slots, or description. Remember to click Save after making your changes.
Deleting a Pool
To delete a pool, click the Delete button next to the pool. Be cautious, as deleting a pool cannot be undone and will affect any DAGs currently using it.
Using Pools in DAGs
You can assign a task to a pool by specifying the pool parameter when defining the task in your DAG. If you want to limit the concurrency of a specific task, you can set the pool_slots parameter to control how many slots that specific task consumes from the pool.
Example: Assigning a Task to a Pool
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="pool_example_dag",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "pool"],
) as dag:
start = EmptyOperator(task_id="start")
# Assign this task to the 'my_database_pool'
# If 'my_database_pool' has 5 slots, this task will use 1 slot by default.
database_task = EmptyOperator(
task_id="database_operation",
pool="my_database_pool",
)
# This task will use 2 slots from the 'high_concurrency_pool'
high_concurrency_task = EmptyOperator(
task_id="heavy_processing",
pool="high_concurrency_pool",
pool_slots=2,
)
end = EmptyOperator(task_id="end")
start >> [database_task, high_concurrency_task] >> end
In this example:
database_operationwill be limited by the concurrency of themy_database_pool. By default, it will consume 1 slot.heavy_processingwill also be limited by thehigh_concurrency_pool, but it will consume 2 slots from that pool.
Important Considerations:
- If a task is assigned to a pool, it will not run unless there are enough available slots in that pool.
- The
pool_slotsparameter defaults to 1 if not specified. - Tasks with
pool_slots > 1can be used to represent tasks that require more resources or have a larger impact on external systems.
Pools and Task Groups
You can also assign a pool to a TaskGroup. When a pool is assigned to a TaskGroup, all tasks within that group inherit the pool. If individual tasks within the group also specify a pool, their pool will take precedence. If a task within a TaskGroup has pool_slots defined, those slots will be deducted from the parent pool.
Troubleshooting
If your tasks are not running and are stuck in a queued state, check the following:
- Ensure the pool you've assigned the task to exists in the Airflow UI.
- Verify that the pool has sufficient available slots.
- Check if other tasks are consuming all available slots in the pool.
- Ensure the
pool_slotsvalue for your task is not too high for the pool's capacity.
You can monitor pool usage and task states in the Airflow UI's Browse -> Pools section.