Consumer SDK - Developer's Guide

This guide focuses on using the Azure Event Hubs Consumer SDK to process events from your Event Hubs. For sending events, please refer to the Producer SDK guide.

Core Concepts

The Event Hubs Consumer SDK provides a robust and efficient way to read events from Event Hubs. Key concepts include:

Getting Started with the Consumer SDK

To begin consuming events, you'll need to install the relevant SDK package for your language. Below are examples for popular languages.

Python Example

Install the library:

pip install azure-eventhub-checkpointstoreblob

Initialize and run an Event Processor:

from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
import os

consumer_group = "$Default" # Or your custom consumer group name
event_hub_name = "your-event-hub-name"
namespace_name = "your-namespace-name"
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
blob_storage_connection_str = "YOUR_BLOB_STORAGE_CONNECTION_STR"
container_name = "your-blob-container-name"

checkpoint_store = BlobCheckpointStore(blob_storage_connection_str, container_name)

def process_event(event):
    print(f"Received event: {event.body_as_json()}")
    # Process your event here...

def process_error(error):
    print(f"Error occurred: {error}")

if __name__ == "__main__":
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        event_hub_name=event_hub_name,
        checkpoint_store=checkpoint_store
    )

    with client:
        client.subscribe(
            on_event=process_event,
            on_error=process_error
        )
        # The client will run indefinitely until interrupted
        # For a finite run, you might implement custom logic
        print("Starting event consumption...")
        client.run()

C# Example

Install the NuGet package:

dotnet add package Azure.Messaging.EventHubs.Processor

Initialize and run an Event Processor:

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;

string eventHubNamespace = "your-namespace-name";
string eventHubName = "your-event-hub-name";
string consumerGroup = "$Default"; // Or your custom consumer group name
string eventHubConnectionString = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STR");
string blobStorageConnectionString = Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STR");
string blobContainerName = "your-blob-container-name";

var blobServiceClient = new BlobServiceClient(blobStorageConnectionString);
var blobContainerClient = blobServiceClient.GetBlobContainerClient(blobContainerName);

var processorOptions = new EventProcessorClientOptions
{
    // Configure options as needed
};

var processor = new EventProcessorClient(
    blobContainerClient,
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    options: processorOptions);

processor.ProcessEventAsync += HandleEvent;
processor.ProcessErrorAsync += HandleError;

try
{
    await processor.StartProcessingAsync();
    Console.WriteLine("Starting event consumption...");
    // Keep the application running
    Console.WriteLine("Press enter to stop...");
    Console.ReadLine();
}
catch (Exception ex)
{
    Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
    await processor.StopProcessingAsync();
}

async Task HandleEvent(ProcessEventArgs arg)
{
    Console.WriteLine($"Received message: {Encoding.UTF8.GetString(arg.Data.EventBody.ToArray())}");
    // Process your event here...
    await arg.UpdateCheckpointAsync(); // Update checkpoint after successful processing
}

Task HandleError(ProcessErrorEventArgs arg)
{
    Console.WriteLine($"Error occurred: {arg.Identifier} - {arg.Reason}");
    return Task.CompletedTask;
}

Key APIs and Features

The Consumer SDK offers several configurable options:

Feature Description Example Configuration
max_batch_size / MaximumEventCount The maximum number of events to receive in a single batch. Python: max_batch_size=100
C#: processorOptions.MaximumEventCount = 100;
load_balancing_strategy / LoadBalancingStrategy Determines how partitions are distributed among consumers in a consumer group. Common strategies include 'greedy' or 'balanced'. Python: load_balancing_strategy="greedy"
C#: processorOptions.LoadBalancingStrategy = LoadBalancingStrategy.Greedy;
initial_offset / DefaultStartingPosition Specifies where to start reading from if no checkpoint is found for a partition. Options include 'earliest', 'latest', or a specific date/time. Python: initial_offset="-1" (latest) or initial_offset="0" (earliest)
C#: processorOptions.DefaultStartingPosition = EventPosition.Latest;
Checkpoint Storage A mechanism (e.g., Azure Blob Storage) to store and retrieve partition ownership and offsets. Python: BlobCheckpointStore(...)
C#: BlobServiceClient, BlobContainerClient

Best Practices

For detailed API documentation and advanced configuration, please refer to the API Reference.