Mastering Data Operations with the Official SDKs
Azure Cosmos DB is a globally distributed, multi-model database service. The Azure Cosmos DB SDKs provide a programmatic interface for interacting with your Cosmos DB data efficiently and securely. This tutorial will guide you through the essential steps and best practices for using the SDKs in your applications.
We will cover common tasks such as connecting to your database, performing CRUD operations, querying data, and leveraging advanced features for robust application development.
The first step is to install the appropriate SDK for your development language. You can typically do this via your language's package manager.
dotnet add package Microsoft.Azure.Cosmos
npm install @azure/cosmos
pip install azure-cosmos
Refer to the official Azure Cosmos DB SDK documentation for a complete list of supported languages and specific installation instructions.
To connect, you'll need your Cosmos DB account's endpoint URI and a primary key. These can be found in the Azure portal under your Cosmos DB account's "Keys" section.
using Microsoft.Azure.Cosmos;
// Replace with your actual endpoint and key
string cosmosDbEndpoint = Environment.GetEnvironmentVariable("COSMOS_DB_ENDPOINT");
string cosmosDbKey = Environment.GetEnvironmentVariable("COSMOS_DB_KEY");
string databaseId = "MyDatabase";
string containerId = "MyContainer";
CosmosClient client = new CosmosClient(cosmosDbEndpoint, cosmosDbKey);
Database database = await client.CreateDatabaseIfNotExistsAsync(databaseId);
Container container = await database.CreateContainerIfNotExistsAsync(containerId, "/partitionKey");
const { CosmosClient } = require("@azure/cosmos");
// Replace with your actual endpoint and key
const endpoint = process.env.COSMOS_DB_ENDPOINT;
const key = process.env.COSMOS_DB_KEY;
const databaseId = "MyDatabase";
const containerId = "MyContainer";
const client = new CosmosClient({ endpoint, key });
async function setupDatabase() {
const { database } = await client.databases.createIfNotExists({ id: databaseId });
const { container } = await database.containers.createIfNotExists({
id: containerId,
partitionKey: {
kind: "Hash",
paths: ["/partitionKey"]
}
});
console.log(`Database and container '${containerId}' ready.`);
return { database, container };
}
setupDatabase();
The most fundamental operations involve managing individual items within a container.
public class MyItem
{
public string Id { get; set; }
public string Name { get; set; }
public string PartitionKey { get; set; }
}
// Assuming 'container' is your Cosmos DB Container object
MyItem newItem = new MyItem { Id = "item1", Name = "Example Item", PartitionKey = "pk1" };
ItemResponse<MyItem> createResponse = await container.CreateItemAsync(newItem, new PartitionKey(newItem.PartitionKey));
Console.WriteLine($"Created item with ID: {createResponse.Resource.Id}");
string itemId = "item1";
string partitionKeyValue = "pk1";
try
{
ItemResponse<MyItem> readResponse = await container.ReadItemAsync<MyItem>(itemId, new PartitionKey(partitionKeyValue));
Console.WriteLine($"Read item: {readResponse.Resource.Name}");
}
catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.NotFound)
{
Console.WriteLine($"Item with ID {itemId} not found.");
}
You can replace an existing item or partially update it using a JSON merge patch.
// Replace operation
MyItem updatedItem = new MyItem { Id = "item1", Name = "Updated Item Name", PartitionKey = "pk1" };
ItemResponse<MyItem> replaceResponse = await container.ReplaceItemAsync(updatedItem, updatedItem.Id, new PartitionKey(updatedItem.PartitionKey));
Console.WriteLine($"Replaced item with ID: {replaceResponse.Resource.Id}");
// Partial update (e.g., only changing the Name)
var patchOperations = new[]
{
new { op = "replace", path = "/name", value = "Partially Updated Item" }
};
ItemResponse<MyItem> patchResponse = await container.PatchItemAsync<MyItem>(itemId, new PartitionKey(partitionKeyValue), patchOperations);
Console.WriteLine($"Patched item name to: {patchResponse.Resource.Name}");
string itemIdToDelete = "item1";
string partitionKeyValueToDelete = "pk1";
await container.DeleteItemAsync<MyItem>(itemIdToDelete, new PartitionKey(partitionKeyValueToDelete));
Console.WriteLine($"Deleted item with ID: {itemIdToDelete}");
Cosmos DB uses SQL-like syntax for querying. The SDKs provide methods to execute these queries.
var queryDefinition = new QueryDefinition("SELECT * FROM c WHERE c.Name = @name")
.WithParameter("@name", "Example Item");
FeedResponse<MyItem> queryResponse = await container.GetItemQueryIterator<MyItem>(queryDefinition).ReadNextAsync();
foreach (MyItem item in queryResponse)
{
Console.WriteLine($"Found item: {item.Id} - {item.Name}");
}
You can also use LINQ for querying in .NET, which provides a more strongly-typed experience.
// LINQ example
IOrderedQueryable<MyItem> linqQuery = container.GetItemLinqQuery<MyItem>()
.Where(item => item.Name == "Example Item");
foreach (MyItem item in await linqQuery.AsEnumerableAsync())
{
Console.WriteLine($"Found item via LINQ: {item.Id}");
}
For scenarios where you need to perform multiple operations atomically within a single transaction, you can use transactional batch.
var batch = container.CreateTransactionalBatch(new PartitionKey("pk1"));
MyItem item1 = new MyItem { Id = "batchItem1", Name = "Batch Item 1", PartitionKey = "pk1" };
MyItem item2 = new MyItem { Id = "batchItem2", Name = "Batch Item 2", PartitionKey = "pk1" };
batch.CreateItem(item1);
batch.ReplaceItem(item2.Id, item2);
batch.DeleteItem("existingItemId");
TransactionalBatchResponse batchResponse = await batch.ExecuteAsync();
if (batchResponse.IsSuccessStatusCode)
{
Console.WriteLine("Transactional batch executed successfully.");
}
else
{
Console.WriteLine($"Transactional batch failed: {batchResponse.StatusCode}");
}
Transactional batches (covered above) are the primary mechanism for ACID transactions across multiple items within the same logical partition.
The Change Feed provides a persistent, append-only log of changes to your Cosmos DB data. SDKs make it easy to consume this feed for real-time processing or synchronization.
// Example of creating a change feed processor host
var processorOptions = new ChangeFeedProcessorOptions
{
FeedPollDelay = TimeSpan.FromSeconds(5),
StartFromBeginning = true // Or specify a continuation token
};
var leaseContainer = database.DefineContainer("leases") // A separate container for leases
.WithPartitionKey("/id")
.CreateIfNotExistsAsync()
.Result;
var changeFeedProcessor = container.GetChangeFeedProcessorBuilder<MyItem>(
processorName: "MyChangeFeedProcessor",
onChangesDelegate: async (IReadOnlyCollection<MyItem> changes, CancellationToken cancellationToken) =>
{
foreach (var item in changes)
{
Console.WriteLine($"Change detected for item: {item.Id}, Name: {item.Name}");
// Process the change here (e.g., send to another service, update analytics)
}
})
.WithInstanceName("Instance1")
.WithLeaseContainer(leaseContainer.Id)
.WithMaxItems(100)
.WithPollInterval(TimeSpan.FromSeconds(5))
.Build();
await changeFeedProcessor.StartAsync();
// Keep the application running to process changes...
// await Task.Delay(Timeout.Infinite);
// await changeFeedProcessor.StopAsync();
You can deploy and execute stored procedures and triggers written in JavaScript directly on Cosmos DB. The SDKs allow you to invoke them.
// Assuming you have a stored procedure named "myStoredProcedure"
var sprocResponse = await container.Scripts.ExecuteStoredProcedureAsync<string>(
"myStoredProcedure",
new PartitionKey("pk1"),
new dynamic[] { "parameter1", "parameter2" });
Console.WriteLine($"Stored procedure result: {sprocResponse.Resource}");
Effective partitioning is crucial for performance and scalability. Your choice of partition key impacts how your data is distributed. When performing operations, you must specify the partition key value for the item(s) you are interacting with.
Common Partitioning Strategies:
The SDKs handle routing requests to the correct partition automatically once you provide the partition key.
CosmosClient and reuse it throughout your application's lifecycle. This leverages connection pooling and reduces overhead.CosmosClient instances frequently.SELECT * sparingly and only retrieve the fields you need. Leverage indexing.Always implement comprehensive error handling, especially for transient errors like throttling (HTTP status code 429) and network issues.
The SDKs throw exceptions like CosmosException. Implement retry logic for appropriate status codes.
// Example Retry Logic (simplified)
int maxRetries = 5;
for (int i = 0; i < maxRetries; i++)
{
try
{
// Your Cosmos DB operation here
var response = await container.ReadItemAsync<MyItem>("someId", new PartitionKey("pk"));
// Success, break loop
break;
}
catch (CosmosException ex) when (ex.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
{
// Throttled, wait and retry
TimeSpan delay = ex.RetryAfter ?? TimeSpan.FromSeconds(2);
await Task.Delay(delay);
}
catch (CosmosException ex)
{
// Handle other specific Cosmos exceptions
Console.WriteLine($"Error: {ex.Message}");
break; // Or rethrow
}
catch (Exception ex)
{
// Handle general exceptions
Console.WriteLine($"Unexpected error: {ex.Message}");
break; // Or rethrow
}
}
Keep your SDKs updated to benefit from the latest features, performance improvements, and security patches. Regularly check for new versions and test them in your development environment before deploying to production.