Azure Event Hubs Developer's Guide

Comprehensive resources for Event Hubs developers

Consuming Messages from Azure Event Hubs

This section details how to consume messages from Azure Event Hubs using various client libraries and patterns. Efficiently reading from Event Hubs is crucial for processing real-time data streams.

Understanding Consumer Groups

To consume messages, you need to create a consumer group. A consumer group is a view of an entire Event Hub. Each consumer group allows a separate application, or a different part of the same application, to independently consume from the Event Hub without blocking other consumer groups.

This isolation is key for scenarios like:

Choosing a Client Library

Azure provides several SDKs to interact with Event Hubs. The most common ones for consumption are:

We will focus on common patterns applicable across most SDKs.

Core Consumption Patterns

1. Basic Event Processing (Polling)

The simplest way to consume messages is to periodically poll for new events. This is often done using a loop that creates an EventProcessorClient (or equivalent) and registers an event handler.

For production environments, consider using the EventProcessorClient, which handles partition management, load balancing, and checkpointing automatically.

2. Using EventProcessorClient (Recommended)

The EventProcessorClient is a high-level abstraction that simplifies message consumption. It manages connections to Event Hubs, handles partition distribution among multiple instances of your application (for load balancing and fault tolerance), and manages checkpointing to track progress.

Key Components of EventProcessorClient:

Example (Conceptual - .NET SDK):

This example illustrates the core idea using the .NET SDK. Actual implementation will involve specific constructor arguments and configuration.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;

// Replace with your actual connection strings and container name
string eventHubsConnectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
string blobStorageConnectionString = "YOUR_BLOB_STORAGE_CONNECTION_STRING";
string blobContainerName = "eventhub-checkpoints";

// Create BlobServiceClient and BlobContainerClient for checkpointing
var blobServiceClient = new BlobServiceClient(blobStorageConnectionString);
var blobContainerClient = blobServiceClient.GetBlobContainerClient(blobContainerName);

// Create an EventProcessorClient
var processor = new EventProcessorClient(
    blobContainerClient,
    "my-consumer-group", // Your consumer group name
    eventHubsConnectionString,
    eventHubName);

// Register handlers for events, errors, and partition events
processor.ProcessEventAsync += HandleEvent;
processor.ProcessErrorAsync += HandleError;
processor.PartitionInitializingAsync += PartitionInitializingHandler;
processor.PartitionClosingAsync += PartitionClosingHandler;

try
{
    // Start processing events
    await processor.StartProcessingAsync();

    Console.WriteLine("Press Enter to stop processing...");
    Console.ReadLine();

    // Stop processing events
    await processor.StopProcessingAsync();
}
catch (Exception ex)
{
    Console.WriteLine($"An error occurred: {ex.Message}");
}

// --- Event Handlers ---

async Task HandleEvent(ProcessEventArgs eventArgs)
{
    // eventArgs.Data contains the event body
    Console.WriteLine($"Received message: {Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray())}");
    Console.WriteLine($"Partition ID: {eventArgs.Partition.Id}");
    Console.WriteLine($"Offset: {eventArgs.Data.Offset}");
    Console.WriteLine($"Sequence Number: {eventArgs.Data.SequenceNumber}");

    // Complete the message to acknowledge successful processing
    // For Event Hubs, you typically track progress via checkpoints,
    // so explicitly calling Complete is less common than with Service Bus.
    // The EventProcessorClient handles checkpointing automatically after a batch.

    // To manually checkpoint or for fine-grained control:
    // await eventArgs.UpdateCheckpointAsync();
}

Task HandleError(ProcessErrorEventArgs eventArgs)
{
    Console.WriteLine($"Error encountered: {eventArgs.Exception.Message}");
    Console.WriteLine($"Partition ID: {eventArgs.PartitionId}");
    return Task.CompletedTask;
}

Task PartitionInitializingHandler(PartitionInitializingEventArgs eventArgs)
{
    Console.WriteLine($"Initializing partition: {eventArgs.PartitionId}");
    // You can set specific starting offsets here if needed, e.g.,
    // eventArgs.DefaultStartingPosition = new EventPosition(DateTime.UtcNow.AddDays(-1));
    return Task.CompletedTask;
}

Task PartitionClosingHandler(PartitionClosingEventArgs eventArgs)
{
    Console.WriteLine($"Closing partition: {eventArgs.PartitionId}");
    // If you need to do something with the last event before closing:
    // if (eventArgs.Reason == PartitionClosingReason.Shutdown)
    // {
    //     // ... perform final actions ...
    // }
    return Task.CompletedTask;
}
            

3. Manual Partition Management

While EventProcessorClient is recommended, you might need finer control or be in a scenario where automatic management is not feasible. In such cases, you can manually manage partitions, offsets, and checkpoints.

This involves:

This approach is more complex and requires careful handling of load balancing and failure scenarios.

Checkpointing: Ensuring Reliable Consumption

Checkpointing is vital for ensuring that your application can resume processing from where it left off after a restart or failure. It involves storing the offset and sequence number of the last successfully processed event for a given partition and consumer group.

EventProcessorClient automates this by using a designated storage (like Azure Blob Storage) to save checkpoints. You configure this during the client's initialization.

Handling Errors and Retries

Robust applications must handle errors gracefully. Common errors include network issues, transient service failures, or malformed messages.

Monitoring Consumption

Monitor key metrics to ensure your consumers are keeping up with the event flow:

Azure Monitor and Application Insights provide excellent tools for observing these metrics.

Remember to always consult the official Azure SDK documentation for the specific language and version you are using, as API details and best practices can evolve.