Cosmos DB Trigger Binding for Azure Functions

Introduction

The Cosmos DB trigger binding allows you to create Azure Functions that execute in response to changes in a Cosmos DB collection. When documents are inserted or updated in a specified Cosmos DB container, the trigger fires, passing the changed documents to your function for processing.

This binding is essential for building event-driven architectures where actions need to be taken based on data modifications in Cosmos DB. It leverages the Change Feed feature of Azure Cosmos DB.

Trigger Functionality

The Cosmos DB trigger operates by monitoring the Change Feed of a Cosmos DB container. The Change Feed is an append-only log of all the changes that have occurred to documents within a container. The trigger binding manages the process of reading from this Change Feed.

  • Automatic Invocation: Your function is automatically invoked when new or updated documents appear in the monitored container.
  • Batch Processing: The trigger can process multiple document changes in a single function invocation, improving efficiency.
  • Lease Container: A separate container, known as the "lease container," is used to store lease tokens. These leases help coordinate multiple function instances to ensure each change is processed only once across all instances.
  • Partition Key Handling: The binding can distribute the processing of changes across different partitions of the monitored container, enabling horizontal scaling.

Configuration

The Cosmos DB trigger is configured using attributes or configuration files, depending on your language and development model.

Connection String

You need to provide a connection string to your Cosmos DB account. This is typically done in your application's settings (e.g., local.settings.json for local development or application settings in Azure).

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "CosmosDBConnection": "AccountEndpoint=https://YOUR_COSMOSDB_ACCOUNT.documents.azure.com:443/;AccountKey=YOUR_COSMOSDB_KEY;"
  }
}

Trigger Attribute (C# Example)

In C#, you use attributes to define the trigger. Key properties include:

  • Connection: The name of the app setting that contains the Cosmos DB connection string.
  • DatabaseName: The name of the Cosmos DB database.
  • ContainerName: The name of the Cosmos DB container to monitor.
  • LeaseContainerName: The name of the container used for leases. This container must exist and have a partition key of /id.
  • CreateLeaseContainerIfNotExists: A boolean indicating whether to create the lease container if it doesn't exist.
using Microsoft.Azure.WebJobs;

namespace MyCosmosDbFunctionApp
{
    public static class CosmosDbTriggerFunction
    {
        [FunctionName("CosmosDbProcessor")]
        public static void Run(
            [CosmosDBTrigger(
                databaseName: "MyDatabase",
                containerName: "MyItems",
                Connection = "CosmosDBConnection",
                LeaseContainerName = "leases",
                CreateLeaseContainerIfNotExists = true)]IReadOnlyList<MyDocument> documents,
            ILogger log)
        {
            log.LogInformation($"Processing {documents.Count} documents...");
            foreach (var doc in documents)
            {
                log.LogInformation($"Received document: {doc.Id}");
                // Process the document here
            }
        }
    }

    public class MyDocument
    {
        public string Id { get; set; }
        // Other properties...
    }
}

Trigger Configuration (JavaScript Example)

In JavaScript, you configure the trigger in the function.json file.

{
  "scriptFile": "index.js",
  "bindings": [
    {
      "name": "documents",
      "type": "cosmosDBTrigger",
      "direction": "in",
      "databaseName": "MyDatabase",
      "containerName": "MyItems",
      "connectionStringSetting": "CosmosDBConnection",
      "leaseContainerName": "leases",
      "createLeaseContainerIfNotExists": true
    }
  ]
}

And in index.js:

module.exports = async function (context, documents) {
    context.log('JavaScript cosmosDB trigger function processed', documents.length, 'documents');
    documents.forEach(doc => {
        context.log('Processing document:', doc.id);
        // Process the document here
    });
};

Code Examples

C# Example: Processing Documents

Function Definition

// As shown in the configuration section above...
[FunctionName("CosmosDbProcessor")]
public static void Run(
    [CosmosDBTrigger(...)]IReadOnlyList<MyDocument> documents,
    ILogger log)
{
    // ... processing logic ...
}

Processing Logic

foreach (var doc in documents)
{
    log.LogInformation($"Processing document ID: {doc.Id}, Name: {doc.Name}");
    // Example: Update another service, send a notification, etc.
    if (doc.Status == "New")
    {
        log.LogInformation($"Document {doc.Id} is new. Performing initial setup.");
        // Perform setup tasks
    }
    else if (doc.Status == "Updated")
    {
        log.LogInformation($"Document {doc.Id} was updated.");
        // Handle update logic
    }
}

JavaScript Example: Logging Document Changes

function.json

{
  "scriptFile": "index.js",
  "bindings": [
    {
      "name": "documents",
      "type": "cosmosDBTrigger",
      "direction": "in",
      "databaseName": "MyDatabase",
      "containerName": "MyItems",
      "connectionStringSetting": "CosmosDBConnection",
      "leaseContainerName": "leases",
      "createLeaseContainerIfNotExists": true
    }
  ]
}

index.js

module.exports = async function (context, documents) {
    context.log('Cosmos DB trigger function executed.');
    
    for (const doc of documents) {
        context.log(`Document changed: ID=${doc.id}, Type=${doc.type}`);
        
        if (doc._attachments && doc._attachments.length > 0) {
            context.log(`Document has ${doc._attachments.length} attachments.`);
        }
        
        // Example: Send notification if a specific field changes
        if (doc.hasOwnProperty('priority') && doc.priority === 'High') {
            context.log(`High priority document detected: ${doc.id}`);
            // Trigger a notification service or another Azure Function
        }
    }
};

Advanced Topics

Error Handling and Retries

By default, if your function execution fails, the Azure Functions runtime might retry the invocation. For persistent errors, consider implementing dead-lettering mechanisms or using output bindings to send problematic documents to an error queue or log.

Checkpointing and Consistency

The lease container is crucial for maintaining the state of the Change Feed processing. It ensures that your function doesn't reprocess already handled documents. For highly critical applications, ensure the lease container is properly configured and monitored.

Scaling

The Cosmos DB trigger scales automatically with your Azure Functions plan. However, to effectively scale the processing of the Change Feed, your Cosmos DB container should be partitioned appropriately, and the trigger's configuration should allow for distributing work across partitions.

Multiple Triggers

You can have multiple Cosmos DB trigger functions monitoring the same container, but ensure that your application logic correctly handles potential duplicate processing or race conditions if not using leases properly. Each trigger instance will manage its own leases.

Note: For efficient handling of large volumes of data, consider the batch size and the processing time of your function. If a single invocation takes too long, it might impact the overall throughput.

Tip: When working with the Cosmos DB trigger, always ensure your lease container is separate from the container you are monitoring to avoid conflicts.