Azure Cosmos DB Change Feed

What is Change Feed?

The Azure Cosmos DB change feed is a persistent record of changes that have occurred to your data within a container. It includes a chronological ordered list of documents that have been inserted or updated in the container. Every Azure Cosmos DB container has its own change feed.

The change feed provides a mechanism to:

How it Works

When a document in a container is created, updated, or deleted, a new entry is added to the change feed. The change feed is a log of these modifications. You can read from the change feed to process these changes.

The change feed is processed using lease containers. A lease container is a regular Azure Cosmos DB container that is used by change feed processors to coordinate the distributed processing of the change feed. Each partition of the container's change feed is processed by a single processor instance at any given time.

Change Feed Tokens

When you read from the change feed, you often use continuation tokens. These tokens represent a specific point in the change feed. You can save these tokens and resume reading from where you left off, ensuring that no changes are missed.

Use Cases

The change feed is a powerful feature with numerous applications:

Azure Cosmos DB Change Feed Diagram

Conceptual diagram of Azure Cosmos DB Change Feed

Reading the Change Feed

There are two primary ways to read the change feed:

1. Change Feed Processor Library

The recommended approach for most scenarios is to use the Azure Cosmos DB Change Feed Processor library. This library simplifies the process of reading and processing the change feed by handling:

You define a delegate that will be called for each batch of changes. The library takes care of the rest.

2. Direct API Reads

For simpler scenarios or custom processing logic, you can directly query the change feed using the Azure Cosmos DB REST API or SDKs. This method gives you more control but requires manual management of continuation tokens and distributed processing logic.

SDK Examples

C# Example (Change Feed Processor)

This example demonstrates how to set up a basic change feed processor to log changes to the console.


using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.ChangeFeedProcessor;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class Document
{
    public string Id { get; set; }
    // ... other properties
}

public class Program
{
    private static CosmosClient cosmosClient;
    private static Container sourceContainer;
    private static Container leaseContainer;
    private static ChangeFeedProcessor processor;

    public static async Task Main(string[] args)
    {
        string endpoint = "YOUR_COSMOS_DB_ENDPOINT";
        string key = "YOUR_COSMOS_DB_KEY";
        string databaseId = "YOUR_DATABASE_ID";
        string sourceContainerId = "YOUR_SOURCE_CONTAINER_ID";
        string leaseContainerId = "YOUR_LEASE_CONTAINER_ID";

        cosmosClient = new CosmosClient(endpoint, key);
        Database database = await cosmosClient.CreateDatabaseIfNotExistsAsync(databaseId);

        sourceContainer = await database.CreateContainerIfNotExistsAsync(
            id: sourceContainerId,
            partitionKeyPath: "/id" // Adjust partition key path as needed
        );

        leaseContainer = await database.CreateContainerIfNotExistsAsync(
            id: leaseContainerId,
            partitionKeyPath: "/id" // Lease container usually has /id as partition key
        );

        processor = await ChangeFeedProcessor.CreateAsync(
            sourceContainer: sourceContainer,
            leaseContainer: leaseContainer,
            onChangesDelegate: HandleChangesAsync,
            new ChangeFeedProcessorOptions {
                FeedPollDelay = TimeSpan.FromSeconds(5) // Poll every 5 seconds
            }
        );

        Console.WriteLine("Starting Change Feed Processor...");
        await processor.StartAsync();

        Console.WriteLine("Processor started. Press Enter to stop.");
        Console.ReadLine();

        Console.WriteLine("Stopping Change Feed Processor...");
        await processor.StopAsync();
        Console.WriteLine("Processor stopped.");
    }

    private static Task HandleChangesAsync(IReadOnlyCollection<ChangeFeedObserverModel> documents, CancellationToken cancellationToken)
    {
        foreach (var document in documents)
        {
            Console.WriteLine($"  Detected operation: {document.Type}, Id: {document.Id}");
            // Process the document changes here
        }
        return Task.CompletedTask;
    }
}
        

Important Considerations