Azure Cosmos DB Python SDK

Azure Cosmos DB Python SDK Reference

This document provides a comprehensive reference for the Azure Cosmos DB Python SDK. It covers installation, usage, and detailed explanations of core functionalities to help you build scalable and performant applications on Azure Cosmos DB.

Installation

Install the Azure Cosmos DB Python SDK using pip:

pip install azure-cosmos

It's recommended to use a virtual environment for your project.

Getting Started

The Azure Cosmos DB Python SDK allows you to interact with your Cosmos DB account from your Python applications. You can perform operations such as creating databases, containers, and items, as well as executing complex queries.

Ensure you have an Azure Cosmos DB account provisioned and obtain your endpoint and primary key from the Azure portal.

Creating a Client

To interact with Cosmos DB, you need to create a client instance. This typically involves providing your account's endpoint and primary key.

from azure.cosmos import CosmosClient

# Replace with your actual endpoint and key
endpoint = "YOUR_COSMOS_DB_ENDPOINT"
key = "YOUR_COSMOS_DB_PRIMARY_KEY"

client = CosmosClient(endpoint, credential=key)

Databases

You can manage databases within your Cosmos DB account:

Creating a Database

# Get a database object
database = client.create_database_if_not_exists(id='MyDatabase')
print(f"Database created or already exists: {database.id}")

Reading a Database

database = client.get_database_client('MyDatabase')
print(f"Reading database: {database.id}")

Listing Databases

for db in client.list_databases():
    print(f"Found database: {db['id']}")

Containers

Containers are the fundamental units of scalability and throughput in Cosmos DB. They store items.

Creating a Container

from azure.cosmos.partition_key import PartitionKey

# Use an existing database client
database = client.get_database_client('MyDatabase')

# Create a container with a partition key
container = database.create_container_if_not_exists(
    id='MyContainer',
    partition_key=PartitionKey(path='/partitionKey')
)
print(f"Container created or already exists: {container.id}")

Reading a Container

container = database.get_container_client('MyContainer')
print(f"Reading container: {container.id}")

Listing Containers

for cont in database.list_containers():
    print(f"Found container: {cont['id']}")

Items

Items are JSON documents stored within containers.

Creating an Item

item_body = {
    "id": "item1",
    "category": "gear",
    "name": "Advanced Backpack",
    "description": "The best backpack for your next adventure.",
    "price": 150,
    "tags": ["outdoors", "travel", "hiking"],
    "partitionKey": "gear"
}

created_item = container.create_item(body=item_body)
print(f"Created item: {created_item['id']}")

Reading an Item

read_item = container.read_item(item='item1', partition_key='gear')
print(f"Read item: {read_item['name']}")

Upserting an Item

updated_item_body = {
    "id": "item1",
    "category": "gear",
    "name": "Advanced Backpack (Updated)",
    "description": "The best backpack for your next adventure, now with more features.",
    "price": 160,
    "tags": ["outdoors", "travel", "hiking", "new"],
    "partitionKey": "gear"
}
upserted_item = container.upsert_item(body=updated_item_body)
print(f"Upserted item: {upserted_item['id']}")

Deleting an Item

container.delete_item(item='item1', partition_key='gear')
print("Item deleted successfully.")

Queries

Execute SQL queries against your data.

Simple Query

from azure.cosmos.exceptions import QueryError

query = "SELECT * FROM c WHERE c.category = 'gear'"
items = list(container.query_items(
    query=query,
    enable_cross_partition_query=True  # Use if your query might span partitions
))

print("Query results:")
for item in items:
    print(f"- {item['name']} (${item['price']})")

Parameterized Query

parameterized_query = "SELECT * FROM c WHERE c.category = @category"
parameters = [{"name": "@category", "value": "gear"}]

items_filtered = list(container.query_items(
    query=parameterized_query,
    parameters=parameters
))

print("\nParameterized query results:")
for item in items_filtered:
    print(f"- {item['name']}")

Stored Procedures

Stored procedures allow you to execute logic directly on the server side, offering better performance and atomicity.

Creating a Stored Procedure

sp_body = {
    "id": "incrementCounter",
    "body": "function incrementCounter() { var counter = GetContext().getCollection().readFromScript().read().counter; counter++; GetContext().getCollection().replace(" + "itself" + ", { counter: counter }); }"
}
created_sp = container.scripts.create_stored_procedure(body=sp_body)
print(f"Stored procedure created: {created_sp['id']}")

Executing a Stored Procedure

result = container.scripts.execute_stored_procedure(
    procedure_id='incrementCounter',
    partition_key='some_partition_key_value' # Required if your SP operates on a specific partition
)
print(f"Stored procedure executed. Result: {result}")

Triggers

Triggers execute automatically before or after an operation on an item.

Creating a Trigger

trigger_body = {
    "id": "validateItem",
    "body": "function validateItem() { var item = getResource(); if (!item.name) { throw new Error('Item must have a name.'); } }",
    "triggerType": "pre",
    "triggerOperation": "all"
}
created_trigger = container.scripts.create_trigger(body=trigger_body)
print(f"Trigger created: {created_trigger['id']}")

User-Defined Functions (UDFs)

UDFs extend the query capabilities of Cosmos DB by allowing custom functions within your SQL queries.

Creating a UDF

udf_body = {
    "id": "calculateTax",
    "body": "function calculateTax(price) { return price * 0.08; }"
}
created_udf = container.scripts.create_user_defined_function(body=udf_body)
print(f"UDF created: {created_udf['id']}")

Using a UDF in a Query

udf_query = "SELECT c.name, udf.calculateTax(c.price) as tax FROM c WHERE c.category = 'gear'"
items_with_tax = list(container.query_items(query=udf_query))
print("\nItems with calculated tax:")
for item in items_with_tax:
    print(f"- {item['name']}: Tax = ${item['tax']:.2f}")

Transactions

Azure Cosmos DB supports ACID transactions for operations within a single logical partition using stored procedures.

Cross-partition transactions are not supported directly. Stored procedures are essential for transactional operations.

Change Feed

The change feed provides a persistent, append-only log of changes to items in a container. It's useful for background tasks, data synchronization, and event-driven architectures.

Reading from the Change Feed

# You typically read from the change feed using a separate client or listener
# The SDK provides mechanisms to manage lease containers for distributed processing.
# Example concept:
# from azure.cosmos.changefeed import ChangeFeedProcessor
#
# async def process_changes(change):
#     print(f"Change detected: {change.get('id')}")
#
# changefeed_processor = ChangeFeedProcessor(container, lease_container)
# await changefeed_processor.start(process_changes)
print("Change feed operations involve setting up a ChangeFeedProcessor and a lease container.")

Bulk Operations

The SDK supports bulk operations to efficiently perform multiple operations (create, update, upsert, delete) in a single request.

Performing Bulk Operations

from azure.cosmos import BulkOperationResponse

operations = [
    container.upsert_item(body={"id": "bulk_item_1", "value": 10, "partitionKey": "bulk"}),
    container.upsert_item(body={"id": "bulk_item_2", "value": 20, "partitionKey": "bulk"}),
    container.delete_item(item="bulk_item_1", partition_key="bulk")
]

results = container.execute_bulk_operations(operations=operations)

for result in results:
    if result.status_code < 300:
        print(f"Operation succeeded: {result.resource_id}")
    else:
        print(f"Operation failed: Status {result.status_code}, Message: {result.status_message}")

Error Handling

The SDK raises specific exceptions for Cosmos DB errors. It's crucial to handle these exceptions gracefully.

from azure.cosmos.exceptions import CosmosHttpResponseError

try:
    item = container.read_item(item='non_existent_item', partition_key='some_key')
except CosmosHttpResponseError as e:
    if e.status_code == 404:
        print("Item not found.")
    else:
        print(f"An error occurred: {e}")

Common exceptions include CosmosHttpResponseError, ResourceNotFoundError, ResourceBusyError, and ConflictError.

Advanced Usage

The SDK offers advanced features like:

  • Request Options: Control consistency levels, throughput, and indexing policies for individual requests.
  • Indexing Policies: Define how data is indexed for efficient querying.
  • Partitioning Strategies: Understand and implement effective partitioning for scalability.
  • Connection Modes: Choose between gateway and direct connection modes for optimal performance.
  • SDK Configuration: Fine-tune client-level configurations for connection pooling, timeouts, etc.

Refer to the official Azure Cosmos DB Python SDK documentation for more in-depth information on these topics.