Consumer SDK - Developer's Guide
This guide focuses on using the Azure Event Hubs Consumer SDK to process events from your Event Hubs. For sending events, please refer to the Producer SDK guide.
Core Concepts
The Event Hubs Consumer SDK provides a robust and efficient way to read events from Event Hubs. Key concepts include:
- Event Processor: The central component for consuming events. It handles partitioning, load balancing, and checkpointing.
- Consumer Group: A logical view of an Event Hub that allows multiple applications or instances of the same application to read events independently. Each consumer group maintains its own read position.
- Partitions: Event Hubs are partitioned to allow for parallel processing. The SDK manages distribution of partitions across instances of your application.
- Checkpoints: Storing the offset of the last successfully processed event for each partition within a consumer group. This enables resuming processing from where you left off.
Getting Started with the Consumer SDK
To begin consuming events, you'll need to install the relevant SDK package for your language. Below are examples for popular languages.
Python Example
Install the library:
pip install azure-eventhub-checkpointstoreblob
Initialize and run an Event Processor:
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
import os
consumer_group = "$Default" # Or your custom consumer group name
event_hub_name = "your-event-hub-name"
namespace_name = "your-namespace-name"
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
blob_storage_connection_str = "YOUR_BLOB_STORAGE_CONNECTION_STR"
container_name = "your-blob-container-name"
checkpoint_store = BlobCheckpointStore(blob_storage_connection_str, container_name)
def process_event(event):
print(f"Received event: {event.body_as_json()}")
# Process your event here...
def process_error(error):
print(f"Error occurred: {error}")
if __name__ == "__main__":
client = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
event_hub_name=event_hub_name,
checkpoint_store=checkpoint_store
)
with client:
client.subscribe(
on_event=process_event,
on_error=process_error
)
# The client will run indefinitely until interrupted
# For a finite run, you might implement custom logic
print("Starting event consumption...")
client.run()
C# Example
Install the NuGet package:
dotnet add package Azure.Messaging.EventHubs.Processor
Initialize and run an Event Processor:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;
string eventHubNamespace = "your-namespace-name";
string eventHubName = "your-event-hub-name";
string consumerGroup = "$Default"; // Or your custom consumer group name
string eventHubConnectionString = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STR");
string blobStorageConnectionString = Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STR");
string blobContainerName = "your-blob-container-name";
var blobServiceClient = new BlobServiceClient(blobStorageConnectionString);
var blobContainerClient = blobServiceClient.GetBlobContainerClient(blobContainerName);
var processorOptions = new EventProcessorClientOptions
{
// Configure options as needed
};
var processor = new EventProcessorClient(
blobContainerClient,
consumerGroup,
eventHubConnectionString,
eventHubName,
options: processorOptions);
processor.ProcessEventAsync += HandleEvent;
processor.ProcessErrorAsync += HandleError;
try
{
await processor.StartProcessingAsync();
Console.WriteLine("Starting event consumption...");
// Keep the application running
Console.WriteLine("Press enter to stop...");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
await processor.StopProcessingAsync();
}
async Task HandleEvent(ProcessEventArgs arg)
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(arg.Data.EventBody.ToArray())}");
// Process your event here...
await arg.UpdateCheckpointAsync(); // Update checkpoint after successful processing
}
Task HandleError(ProcessErrorEventArgs arg)
{
Console.WriteLine($"Error occurred: {arg.Identifier} - {arg.Reason}");
return Task.CompletedTask;
}
Key APIs and Features
The Consumer SDK offers several configurable options:
| Feature | Description | Example Configuration |
|---|---|---|
max_batch_size / MaximumEventCount |
The maximum number of events to receive in a single batch. | Python: max_batch_size=100C#: processorOptions.MaximumEventCount = 100; |
load_balancing_strategy / LoadBalancingStrategy |
Determines how partitions are distributed among consumers in a consumer group. Common strategies include 'greedy' or 'balanced'. | Python: load_balancing_strategy="greedy"C#: processorOptions.LoadBalancingStrategy = LoadBalancingStrategy.Greedy; |
initial_offset / DefaultStartingPosition |
Specifies where to start reading from if no checkpoint is found for a partition. Options include 'earliest', 'latest', or a specific date/time. | Python: initial_offset="-1" (latest) or initial_offset="0" (earliest)C#: processorOptions.DefaultStartingPosition = EventPosition.Latest; |
| Checkpoint Storage | A mechanism (e.g., Azure Blob Storage) to store and retrieve partition ownership and offsets. | Python: BlobCheckpointStore(...)C#: BlobServiceClient, BlobContainerClient |
Best Practices
- Idempotent Processing: Design your event handlers to be idempotent. This means processing the same event multiple times should yield the same result as processing it once. This is crucial when dealing with retries and potential duplicates.
- Efficient Checkpointing: Update checkpoints only after events have been successfully processed. Avoid updating checkpoints for batches if individual events within the batch might fail.
- Monitoring: Implement robust logging and monitoring to track event processing throughput, latency, and errors.
- Consumer Groups: Use distinct consumer groups for different applications or processing roles to avoid interference.
- Resource Management: Ensure your consumer applications are scalable and can handle varying event loads.
For detailed API documentation and advanced configuration, please refer to the API Reference.