Azure Cosmos DB Python SDK Reference
This document provides a comprehensive reference for the Azure Cosmos DB Python SDK. It covers installation, usage, and detailed explanations of core functionalities to help you build scalable and performant applications on Azure Cosmos DB.
Installation
Install the Azure Cosmos DB Python SDK using pip:
pip install azure-cosmos
It's recommended to use a virtual environment for your project.
Getting Started
The Azure Cosmos DB Python SDK allows you to interact with your Cosmos DB account from your Python applications. You can perform operations such as creating databases, containers, and items, as well as executing complex queries.
Creating a Client
To interact with Cosmos DB, you need to create a client instance. This typically involves providing your account's endpoint and primary key.
from azure.cosmos import CosmosClient
# Replace with your actual endpoint and key
endpoint = "YOUR_COSMOS_DB_ENDPOINT"
key = "YOUR_COSMOS_DB_PRIMARY_KEY"
client = CosmosClient(endpoint, credential=key)
Databases
You can manage databases within your Cosmos DB account:
Creating a Database
# Get a database object
database = client.create_database_if_not_exists(id='MyDatabase')
print(f"Database created or already exists: {database.id}")
Reading a Database
database = client.get_database_client('MyDatabase')
print(f"Reading database: {database.id}")
Listing Databases
for db in client.list_databases():
print(f"Found database: {db['id']}")
Containers
Containers are the fundamental units of scalability and throughput in Cosmos DB. They store items.
Creating a Container
from azure.cosmos.partition_key import PartitionKey
# Use an existing database client
database = client.get_database_client('MyDatabase')
# Create a container with a partition key
container = database.create_container_if_not_exists(
id='MyContainer',
partition_key=PartitionKey(path='/partitionKey')
)
print(f"Container created or already exists: {container.id}")
Reading a Container
container = database.get_container_client('MyContainer')
print(f"Reading container: {container.id}")
Listing Containers
for cont in database.list_containers():
print(f"Found container: {cont['id']}")
Items
Items are JSON documents stored within containers.
Creating an Item
item_body = {
"id": "item1",
"category": "gear",
"name": "Advanced Backpack",
"description": "The best backpack for your next adventure.",
"price": 150,
"tags": ["outdoors", "travel", "hiking"],
"partitionKey": "gear"
}
created_item = container.create_item(body=item_body)
print(f"Created item: {created_item['id']}")
Reading an Item
read_item = container.read_item(item='item1', partition_key='gear')
print(f"Read item: {read_item['name']}")
Upserting an Item
updated_item_body = {
"id": "item1",
"category": "gear",
"name": "Advanced Backpack (Updated)",
"description": "The best backpack for your next adventure, now with more features.",
"price": 160,
"tags": ["outdoors", "travel", "hiking", "new"],
"partitionKey": "gear"
}
upserted_item = container.upsert_item(body=updated_item_body)
print(f"Upserted item: {upserted_item['id']}")
Deleting an Item
container.delete_item(item='item1', partition_key='gear')
print("Item deleted successfully.")
Queries
Execute SQL queries against your data.
Simple Query
from azure.cosmos.exceptions import QueryError
query = "SELECT * FROM c WHERE c.category = 'gear'"
items = list(container.query_items(
query=query,
enable_cross_partition_query=True # Use if your query might span partitions
))
print("Query results:")
for item in items:
print(f"- {item['name']} (${item['price']})")
Parameterized Query
parameterized_query = "SELECT * FROM c WHERE c.category = @category"
parameters = [{"name": "@category", "value": "gear"}]
items_filtered = list(container.query_items(
query=parameterized_query,
parameters=parameters
))
print("\nParameterized query results:")
for item in items_filtered:
print(f"- {item['name']}")
Stored Procedures
Stored procedures allow you to execute logic directly on the server side, offering better performance and atomicity.
Creating a Stored Procedure
sp_body = {
"id": "incrementCounter",
"body": "function incrementCounter() { var counter = GetContext().getCollection().readFromScript().read().counter; counter++; GetContext().getCollection().replace(" + "itself" + ", { counter: counter }); }"
}
created_sp = container.scripts.create_stored_procedure(body=sp_body)
print(f"Stored procedure created: {created_sp['id']}")
Executing a Stored Procedure
result = container.scripts.execute_stored_procedure(
procedure_id='incrementCounter',
partition_key='some_partition_key_value' # Required if your SP operates on a specific partition
)
print(f"Stored procedure executed. Result: {result}")
Triggers
Triggers execute automatically before or after an operation on an item.
Creating a Trigger
trigger_body = {
"id": "validateItem",
"body": "function validateItem() { var item = getResource(); if (!item.name) { throw new Error('Item must have a name.'); } }",
"triggerType": "pre",
"triggerOperation": "all"
}
created_trigger = container.scripts.create_trigger(body=trigger_body)
print(f"Trigger created: {created_trigger['id']}")
User-Defined Functions (UDFs)
UDFs extend the query capabilities of Cosmos DB by allowing custom functions within your SQL queries.
Creating a UDF
udf_body = {
"id": "calculateTax",
"body": "function calculateTax(price) { return price * 0.08; }"
}
created_udf = container.scripts.create_user_defined_function(body=udf_body)
print(f"UDF created: {created_udf['id']}")
Using a UDF in a Query
udf_query = "SELECT c.name, udf.calculateTax(c.price) as tax FROM c WHERE c.category = 'gear'"
items_with_tax = list(container.query_items(query=udf_query))
print("\nItems with calculated tax:")
for item in items_with_tax:
print(f"- {item['name']}: Tax = ${item['tax']:.2f}")
Transactions
Azure Cosmos DB supports ACID transactions for operations within a single logical partition using stored procedures.
Change Feed
The change feed provides a persistent, append-only log of changes to items in a container. It's useful for background tasks, data synchronization, and event-driven architectures.
Reading from the Change Feed
# You typically read from the change feed using a separate client or listener
# The SDK provides mechanisms to manage lease containers for distributed processing.
# Example concept:
# from azure.cosmos.changefeed import ChangeFeedProcessor
#
# async def process_changes(change):
# print(f"Change detected: {change.get('id')}")
#
# changefeed_processor = ChangeFeedProcessor(container, lease_container)
# await changefeed_processor.start(process_changes)
print("Change feed operations involve setting up a ChangeFeedProcessor and a lease container.")
Bulk Operations
The SDK supports bulk operations to efficiently perform multiple operations (create, update, upsert, delete) in a single request.
Performing Bulk Operations
from azure.cosmos import BulkOperationResponse
operations = [
container.upsert_item(body={"id": "bulk_item_1", "value": 10, "partitionKey": "bulk"}),
container.upsert_item(body={"id": "bulk_item_2", "value": 20, "partitionKey": "bulk"}),
container.delete_item(item="bulk_item_1", partition_key="bulk")
]
results = container.execute_bulk_operations(operations=operations)
for result in results:
if result.status_code < 300:
print(f"Operation succeeded: {result.resource_id}")
else:
print(f"Operation failed: Status {result.status_code}, Message: {result.status_message}")
Error Handling
The SDK raises specific exceptions for Cosmos DB errors. It's crucial to handle these exceptions gracefully.
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item='non_existent_item', partition_key='some_key')
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found.")
else:
print(f"An error occurred: {e}")
Common exceptions include CosmosHttpResponseError
, ResourceNotFoundError
, ResourceBusyError
, and ConflictError
.
Advanced Usage
The SDK offers advanced features like:
- Request Options: Control consistency levels, throughput, and indexing policies for individual requests.
- Indexing Policies: Define how data is indexed for efficient querying.
- Partitioning Strategies: Understand and implement effective partitioning for scalability.
- Connection Modes: Choose between gateway and direct connection modes for optimal performance.
- SDK Configuration: Fine-tune client-level configurations for connection pooling, timeouts, etc.
Refer to the official Azure Cosmos DB Python SDK documentation for more in-depth information on these topics.