Pools

This document explains how to manage and use Pools in Apache Airflow.

What are Pools?

Pools are a feature in Apache Airflow that allow you to limit the number of concurrent tasks that can run on a particular resource. This is useful for managing external systems or databases that have a limited capacity for concurrent connections or operations.

For example, if you have a database that can only handle 10 concurrent connections, you can create a pool with a size of 10 and assign all tasks that interact with that database to that pool. Airflow will then ensure that no more than 10 tasks assigned to that pool run simultaneously.

Managing Pools

You can manage pools through the Airflow UI. Navigate to the Admin tab and then select Pools from the dropdown menu.

Creating a New Pool

To create a new pool:

  1. Click the + Pool button.
  2. Enter a Name for your pool. This name will be used to reference the pool in your DAGs.
  3. Enter the Slots, which is the maximum number of concurrent tasks allowed in this pool.
  4. Enter a Description to help you remember the purpose of the pool.
  5. Click Save.

Editing an Existing Pool

To edit an existing pool, click the Edit button next to the pool you wish to modify in the Pools list. You can then change the name, slots, or description. Remember to click Save after making your changes.

Deleting a Pool

To delete a pool, click the Delete button next to the pool. Be cautious, as deleting a pool cannot be undone and will affect any DAGs currently using it.

Using Pools in DAGs

You can assign a task to a pool by specifying the pool parameter when defining the task in your DAG. If you want to limit the concurrency of a specific task, you can set the pool_slots parameter to control how many slots that specific task consumes from the pool.

Example: Assigning a Task to a Pool


from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="pool_example_dag",
    schedule=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example", "pool"],
) as dag:
    start = EmptyOperator(task_id="start")

    # Assign this task to the 'my_database_pool'
    # If 'my_database_pool' has 5 slots, this task will use 1 slot by default.
    database_task = EmptyOperator(
        task_id="database_operation",
        pool="my_database_pool",
    )

    # This task will use 2 slots from the 'high_concurrency_pool'
    high_concurrency_task = EmptyOperator(
        task_id="heavy_processing",
        pool="high_concurrency_pool",
        pool_slots=2,
    )

    end = EmptyOperator(task_id="end")

    start >> [database_task, high_concurrency_task] >> end
        

In this example:

Important Considerations:

Pools and Task Groups

You can also assign a pool to a TaskGroup. When a pool is assigned to a TaskGroup, all tasks within that group inherit the pool. If individual tasks within the group also specify a pool, their pool will take precedence. If a task within a TaskGroup has pool_slots defined, those slots will be deducted from the parent pool.

Troubleshooting

If your tasks are not running and are stuck in a queued state, check the following:

You can monitor pool usage and task states in the Airflow UI's Browse -> Pools section.