Pools
Pools are a mechanism to limit the parallelism of task execution for tasks that run on a particular resource. For example, you might want to ensure that no more than 5 tasks in a particular DAG can run at the same time on a shared database. Pools can be configured in the Airflow UI under the Admin -> Pools section.
What are Pools?
In Airflow, a Pool is a named entity that represents a set of workers or resources that a limited number of tasks can occupy concurrently. Each pool has a specified slot count, which determines how many tasks can run simultaneously within that pool.
Use Cases for Pools
- Resource Limiting: Prevent overwhelming external services (e.g., APIs, databases, third-party systems) by restricting the number of concurrent requests.
- Fairness: Ensure that different DAGs or tasks share a limited resource without one dominating all available slots.
- Preventing Deadlocks: In scenarios where tasks might acquire resources in different orders, pools can help avoid deadlocks.
Creating and Managing Pools
Pools are managed through the Airflow UI.
- Navigate to the Admin menu.
- Click on Pools.
- Click the + button to create a new pool.
- Enter a Pool Name (e.g.,
my_api_pool). This name will be used in your DAG code. - Set the Slots to the maximum number of concurrent tasks allowed.
- Optionally, add a Description for clarity.
- Click Save.
Assigning Tasks to Pools
To assign a task to a specific pool, you use the pool parameter when defining the task in your DAG. If the specified pool is not found, Airflow will typically default to using the default_pool.
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="pool_example_dag",
schedule=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
catchup=False,
tags=["example", "pool"],
) as dag:
start = EmptyOperator(task_id="start")
# This task will run in the 'my_api_pool'
# It will only run if there's an available slot in 'my_api_pool'
task_in_pool = EmptyOperator(
task_id="task_in_my_api_pool",
pool="my_api_pool", # Assigning the task to the pool
pool_slots=1, # Number of slots this task will occupy (default is 1)
)
end = EmptyOperator(task_id="end")
start >> task_in_pool >> end
`pool_slots` Parameter
The pool_slots parameter specifies how many slots a task will occupy within its assigned pool. By default, a task occupies 1 slot. You can increase this value if a single task requires multiple resource slots from the pool.
How Pools Work
When a task is scheduled to run and is assigned to a pool:
- Airflow checks the current number of active tasks running in that pool.
- If the number of active tasks plus the
pool_slotsrequired by the new task is less than or equal to the pool's total Slots, the task is allowed to run. - If the pool is "full" (i.e., all slots are occupied), the task will be queued and will only run when a slot becomes available.
Pools and Executors
Pools are an Airflow-level concept and work independently of the executor you are using (e.g., LocalExecutor, CeleryExecutor, KubernetesExecutor). The executor is responsible for actually running the tasks, while pools enforce concurrency limits at the scheduling level.
Default Pool
If a task does not specify a pool, it is implicitly assigned to the default_pool, which has an infinite number of slots by default. You can create and configure the default_pool in the UI as well.