Receiving Messages from Azure Event Hubs
On this page
Introduction
Receiving messages from Azure Event Hubs is a fundamental operation for any application that consumes event streams. This guide covers the essential concepts and methods for reliably reading events from your Event Hubs, ensuring you can process them efficiently and without data loss.
Key takeaway: Proper handling of message reception involves understanding consumer groups and choosing the right SDK or service integration.
Understanding Consumer Groups
Consumer groups are a core concept in Event Hubs that allow multiple applications or different parts of the same application to independently consume the data in an Event Hub. Each consumer group maintains its own progress (offset) through the event stream. This means that multiple applications can read the same events without interfering with each other. For instance, one consumer group might be used for real-time processing, while another is used for batch analytics.
When you create an Event Hub, a default consumer group named $Default is automatically created. You can create additional consumer groups via the Azure portal, Azure CLI, or SDKs to cater to different consumption needs.
Receiving Strategies
Azure Event Hubs offers several ways to receive messages, each suited for different scenarios and development preferences.
Using EventProcessorHost (Legacy)
The EventProcessorHost was the primary way to receive events using the older Microsoft.Azure.EventHubs SDK. While still functional for existing applications, it's recommended to migrate to the newer SDKs for new development.
Using Azure Functions
Azure Functions provide a serverless compute experience that can natively integrate with Event Hubs. The Event Hubs trigger for Azure Functions simplifies message reception by handling checkpointing, scaling, and error management automatically. This is an excellent choice for event-driven architectures where you want to run code in response to incoming events without managing infrastructure.
To use the Event Hubs trigger:
- Define your function with an Event Hub trigger.
- Configure connection strings and event hub names in your function's configuration.
- The function runtime will handle batching events and passing them to your function code.
Using Azure.Messaging.EventHubs.Processor (Recommended)
The Azure.Messaging.EventHubs.Processor library (part of the Azure.Messaging.EventHubs SDK) is the modern, recommended approach for building robust event receivers. It provides a high-level abstraction for managing the complexities of reading from Event Hubs, including:
- Partition Management: Automatically handles claims for partitions among running instances of your application.
- Checkpointing: Persists the progress of reading from each partition, allowing you to resume from where you left off in case of restarts or failures.
- Error Handling: Provides hooks for handling exceptions during event processing.
- Load Balancing: Distributes partitions evenly across multiple instances of your processor.
This library is designed for scenarios where you need more control over the receiving process than Azure Functions offer, or when building custom microservices that consume events.
Note: Always use a dedicated consumer group for your processor instances to avoid conflicts with other applications.
Example (C# with Event Hubs Processor)
Here's a basic example demonstrating how to set up an event processor to receive messages using the Azure.Messaging.EventHubs.Processor library.
// Install NuGet package: Azure.Messaging.EventHubs.Processor
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventHubReceiver
{
private readonly string eventHubConnectionString;
private readonly string consumerGroup;
private readonly string blobStorageConnectionString;
private readonly string blobContainerName;
public EventHubReceiver(string ehConnectionString, string consumerGroupName, string storageConnectionString, string containerName)
{
eventHubConnectionString = ehConnectionString;
consumerGroup = consumerGroupName;
blobStorageConnectionString = storageConnectionString;
blobContainerName = containerName;
}
public async Task StartProcessingAsync()
{
var processorOptions = new EventProcessorClientOptions
{
// Configure retry options if needed
RetryOptions = new Azure.Core.RetryOptions
{
MaxRetries = 5,
Delay = TimeSpan.FromSeconds(5)
}
};
// Create BlobContainerClient for checkpointing
var blobClient = new BlobServiceClient(blobStorageConnectionString);
var containerClient = blobClient.GetBlobContainerClient(blobContainerName);
var processor = new EventProcessorClient(
containerClient,
consumerGroup,
eventHubConnectionString,
null, // Event Hub name can be specified here or in the connection string
processorOptions);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
Console.WriteLine("Starting Event Hub processor...");
await processor.StartProcessingAsync();
// Keep the application running to continue processing events
Console.WriteLine("Press Ctrl+C to stop the processor.");
await Task.Delay(Timeout.Infinite);
Console.WriteLine("Stopping Event Hub processor...");
await processor.StopProcessingAsync();
}
private async Task ProcessEventHandler(ProcessEventArgs arg)
{
Console.WriteLine($"\tReceived event: '{arg.Data.EventBody.ToString()}'");
Console.WriteLine($"\tEnqueued Time: {arg.Data.EnqueuedTime}");
Console.WriteLine($"\tPartition ID: {arg.Partition.Id}");
// Update checkpoint after successfully processing an event
await arg.UpdateCheckpointAsync(arg.CancellationToken);
Console.WriteLine("Checkpoint updated.");
}
private Task ProcessErrorHandler(ProcessErrorEventArgs arg)
{
Console.WriteLine($"Error in processor. Partition: '{arg.PartitionId ?? "-"}', Offset: '{arg.Offset ?? -1}'. Exception: {arg.Exception}");
return Task.CompletedTask;
}
public static async Task Main(string[] args)
{
// Replace with your actual connection strings and container name
var eventHubConnectionString = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING");
var blobStorageConnectionString = Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING");
var blobContainerName = "eventhub-checkpoints"; // Your blob container name for checkpoints
var consumerGroup = "$Default"; // Or your custom consumer group name
if (string.IsNullOrEmpty(eventHubConnectionString) || string.IsNullOrEmpty(blobStorageConnectionString))
{
Console.WriteLine("Please set EVENTHUB_CONNECTION_STRING and BLOB_STORAGE_CONNECTION_STRING environment variables.");
return;
}
var receiver = new EventHubReceiver(eventHubConnectionString, consumerGroup, blobStorageConnectionString, blobContainerName);
await receiver.StartProcessingAsync();
}
}
Example (Python with Event Hubs Processor)
Here's a Python equivalent using the azure-eventhubs-checkpointstoreblob and azure-eventhubs libraries.
import asyncio
import os
from azure.eventhubslib.aio import EventHubClient
from azure.eventhubslib.checkpointstores.blobcheckpointstore import BlobCheckpointStore
from azure.storage.blob.aio import BlobServiceClient
async def receive_events(eventhub_conn_str, consumer_group, storage_conn_str, container_name):
"""
Receives events from an Event Hub using a BlobCheckpointStore.
"""
print("Initializing BlobCheckpointStore...")
blob_service_client = BlobServiceClient.from_connection_string(storage_conn_str)
checkpoint_store = BlobCheckpointStore(blob_service_client, container_name)
print("Creating EventHubClient...")
client = EventHubClient.from_connection_string(
eventhub_conn_str,
EventHubClient.DEFAULT_SCOPE, # Default scope for Event Hubs
consumer_group=consumer_group
)
print(f"Starting to receive events with consumer group: {consumer_group}...")
async def on_event(partition_context, event):
print(f"Received event: {event.body_as_str()} from partition {partition_context.partition_id}")
# Update checkpoint after processing
await partition_context.update_checkpoint(event)
print(f"Checkpoint updated for partition {partition_context.partition_id} at offset {event.offset}")
async def on_error(partition_context, error):
print(f"Error processing partition {partition_context.partition_id}: {error}")
try:
async with client:
receiver = client.create_consumer(
consumer_group,
checkpoint_store=checkpoint_store,
auto_reconnect=True,
on_event=on_event,
on_error=on_error
)
async with receiver:
await receiver.run() # This will run indefinitely until interrupted
except KeyboardInterrupt:
print("Stopping receiver...")
finally:
print("Receiver stopped.")
async def main():
eventhub_conn_str = os.environ.get("EVENTHUB_CONNECTION_STRING")
storage_conn_str = os.environ.get("BLOB_STORAGE_CONNECTION_STRING")
container_name = "eventhub-checkpoints" # Your blob container name for checkpoints
consumer_group = "$Default" # Or your custom consumer group name
if not eventhub_conn_str or not storage_conn_str:
print("Please set EVENTHUB_CONNECTION_STRING and BLOB_STORAGE_CONNECTION_STRING environment variables.")
return
await receive_events(eventhub_conn_str, consumer_group, storage_conn_str, container_name)
if __name__ == "__main__":
asyncio.run(main())
Remember to install the necessary packages:
pip install azure-eventhubs azure-eventhubs-checkpointstoreblob azure-storage-blob
Best Practices for Receiving
- Dedicated Consumer Groups: Always use a unique consumer group for each distinct application or service that reads from the Event Hub. This prevents interference and ensures independent processing.
- Idempotency: Design your message handlers to be idempotent. This means that processing the same message multiple times should produce the same result without unintended side effects. This is crucial because checkpoints ensure you don't lose messages, but they don't guarantee exactly-once delivery in all failure scenarios.
- Error Handling and Retries: Implement robust error handling within your event processing logic. For transient errors, consider retry mechanisms. For persistent errors, log the error and move to the next event, or dead-letter the problematic event if your architecture supports it.
- Checkpointing Strategy: Choose an appropriate checkpointing frequency. Checkpointing too often can increase load on your storage, while checkpointing too infrequently can lead to reprocessing more messages in case of failure. The
EventProcessorHostandAzure.Messaging.EventHubs.Processorlibraries manage this automatically, but understanding the trade-offs is important. - Monitoring: Monitor your Event Hubs and consumer applications for throughput, latency, and errors. Azure Monitor provides excellent tools for this.
- Scaling: When your event volume increases, you can scale your receiving application by running multiple instances of your processor. The
EventProcessorClientlibrary will automatically distribute partitions among these instances.