Azure Cosmos DB

Documentation

Language:

Python Change Feed Samples

This section provides sample code demonstrating how to use the Change Feed functionality in Azure Cosmos DB with the Python SDK.

What is the Change Feed?

The Change Feed is a persistent record of changes to your documents in an Azure Cosmos DB container. It provides a time-ordered sequence of document versions that enables you to build reactive applications and services that respond to changes in your data. Common use cases include:

  • Data synchronization across different services.
  • Real-time analytics and logging.
  • Data warehousing and ETL processes.
  • Triggering workflows based on data modifications.

Prerequisites

  • An Azure subscription.
  • An Azure Cosmos DB account.
  • A container in your Azure Cosmos DB account.
  • Python 3.7+ installed.
  • The Azure Cosmos DB Python SDK installed: pip install azure-cosmos

Sample 1: Basic Change Feed Listener

This sample demonstrates how to create a simple listener that processes changes from the Change Feed. It connects to your Cosmos DB account and container, reads incoming changes, and prints them to the console.

import asyncio from azure.cosmos import CosmosClient # Configuration ENDPOINT = "" KEY = "" DATABASE_NAME = "mydatabase" CONTAINER_NAME = "mycontainer" async def listen_to_change_feed(): client = CosmosClient(ENDPOINT, credential=KEY) database = client.get_database_client(DATABASE_NAME) container = database.get_container_client(CONTAINER_NAME) print(f"Listening to Change Feed for container: {CONTAINER_NAME}...") # Create a change feed processor if it doesn't exist try: await container.create_change_feed_processor("my_processor_name", handle_changes) print("Change feed processor created/started.") except Exception as e: print(f"Error creating/starting processor (might already exist): {e}") # If processor already exists, we can still try to start listening pass # In a real application, you would manage the lifecycle of the processor. # For this simple example, we'll run indefinitely or until interrupted. # You might need a mechanism to properly shut down the processor. # A common pattern to keep the script running: while True: await asyncio.sleep(1) async def handle_changes(change_feed_events): for event in change_feed_events: print(f"Change detected: {event['type']}") print(f"Document: {event['document']}") print("-" * 20) if __name__ == "__main__": try: asyncio.run(listen_to_change_feed()) except KeyboardInterrupt: print("Change feed listener stopped.")

Note: Replace <YOUR_COSMOS_DB_ENDPOINT> and <YOUR_COSMOS_DB_KEY> with your actual Azure Cosmos DB credentials. Ensure the mydatabase and mycontainer names match your setup. The my_processor_name is an arbitrary identifier for your change feed processor.

Sample 2: Processing with Lease Container

For production scenarios, it's crucial to use a separate lease container to manage the state of your change feed processor. This ensures that if your application restarts, it can resume processing from where it left off. This sample illustrates setting up a lease container.

Important: You need to create a dedicated container in your Cosmos DB database to store leases. Let's call it leases.
import asyncio from azure.cosmos import CosmosClient, exceptions # Configuration ENDPOINT = "" KEY = "" DATABASE_NAME = "mydatabase" SOURCE_CONTAINER_NAME = "mydata" LEASE_CONTAINER_NAME = "leases" async def initialize_lease_container(database): try: lease_container = await database.create_container( id=LEASE_CONTAINER_NAME, partition_key={"path": "/id"} ) print(f"Lease container '{LEASE_CONTAINER_NAME}' created.") return lease_container except exceptions.CosmosResourceExistsError: print(f"Lease container '{LEASE_CONTAINER_NAME}' already exists.") return database.get_container_client(LEASE_CONTAINER_NAME) except Exception as e: print(f"Error initializing lease container: {e}") raise async def listen_to_change_feed_with_leases(): client = CosmosClient(ENDPOINT, credential=KEY) database = client.get_database_client(DATABASE_NAME) source_container = database.get_container_client(SOURCE_CONTAINER_NAME) lease_container = await initialize_lease_container(database) print(f"Listening to Change Feed for container: {SOURCE_CONTAINER_NAME} using lease container: {LEASE_CONTAINER_NAME}...") change_feed_processor = source_container.get_change_feed_processor( processor_name="my_lease_processor", lease_container=lease_container, handler=handle_changes_with_leases ) await change_feed_processor.start() # Keep the processor running while True: await asyncio.sleep(1) async def handle_changes_with_leases(change_feed_events): for event in change_feed_events: print(f"Change detected: {event['type']}") print(f"Document ID: {event['document']['id']}") # Example of accessing document properties # Process the document data as needed # print(f"Document Data: {event['document']}") print("-" * 20) if __name__ == "__main__": try: asyncio.run(listen_to_change_feed_with_leases()) except KeyboardInterrupt: print("Change feed listener stopped.") except Exception as e: print(f"An unexpected error occurred: {e}")

Note: Ensure the leases container has a partition key defined, typically /id, as shown in the initialize_lease_container function.

Key Concepts

  • Processor Name: A unique identifier for a specific change feed processor instance within a container.
  • Lease Container: A container used to store leases that track the progress of change feed processing. Essential for fault tolerance.
  • Handler Function: An asynchronous function that receives batches of change feed events and processes them.
  • Initial State: You can specify whether the change feed should start from the beginning of the feed (start_from='Beginning') or from now (start_from='Now'). The default is 'Now'.

Further Resources