Consuming Azure Event Hubs with the Processor Library

This guide demonstrates how to efficiently consume events from Azure Event Hubs using the Event Hubs Processor library. The processor library simplifies the complexities of checkpointing, load balancing, and error handling, allowing you to focus on your event processing logic.

Prerequisites

Installation

First, install the necessary NuGet package (for .NET):


dotnet add package Azure.Messaging.EventHubs.Processor
        

Core Concepts

Implementing an Event Processor

1. Define Your Event Handler

Create a class that implements the IEventHandler interface (or its equivalent in your language SDK). This class will contain the logic to process each received event.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using System;
using System.Text;
using System.Threading.Tasks;

public class MyEventProcessor : IPartitionProcessor
{
    public async Task ProcessEventAsync(ProcessEventArgs eventArgs)
    {
        // Access event data
        string messageBody = Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray());
        Console.WriteLine($"Received event: {messageBody} (Offset: {eventArgs.Data.Offset}, Sequence: {eventArgs.Data.SequenceNumber})");

        // Perform your processing logic here...

        // Update checkpoint after successful processing
        await eventArgs.UpdateCheckpointAsync();
    }

    public Task ProcessErrorAsync(ProcessErrorEventArgs eventArgs)
    {
        Console.WriteLine($"Error processing event: {eventArgs.Exception.Message}");
        // Handle errors as appropriate
        return Task.CompletedTask;
    }

    public Task InitializePartitionAsync(string partitionId)
    {
        Console.WriteLine($"Initializing processor for partition: {partitionId}");
        return Task.CompletedTask;
    }

    public Task ClosePartitionAsync(string partitionId, ProcessingClosureReason reason)
    {
        Console.WriteLine($"Closing processor for partition: {partitionId} with reason: {reason}");
        return Task.CompletedTask;
    }
}
        

2. Configure and Run the Processor

Instantiate the EventProcessorClient and start processing events.

Note: For production scenarios, consider using a persistent checkpoint store like Azure Blob Storage or Azure Table Storage. The example below uses an in-memory store for simplicity.

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs; // Example for Blob Storage Checkpoint Store
using System;
using System.Threading.Tasks;

public class EventHubConsumer
{
    public static async Task RunAsync()
    {
        string eventHubNamespaceConnectionString = "YOUR_EVENTHUB_NAMESPACE_CONNECTION_STRING";
        string eventHubName = "YOUR_EVENTHUB_NAME";
        string consumerGroup = "$Default"; // Or your custom consumer group

        // Option 1: In-memory checkpoint store (for testing/development)
        var processor = new EventProcessorClient(
            new EventHubConnection(eventHubNamespaceConnectionString, eventHubName),
            consumerGroup,
            new MyEventProcessor()
        );

        // Option 2: Azure Blob Storage checkpoint store (recommended for production)
        /*
        string blobStorageConnectionString = "YOUR_BLOB_STORAGE_CONNECTION_STRING";
        string blobContainerName = "eventhub-checkpoints";
        var blobCheckpointStore = new BlobCheckpointStore(blobStorageConnectionString, blobContainerName);
        var processor = new EventProcessorClient(
            new EventHubConnection(eventHubNamespaceConnectionString, eventHubName),
            consumerGroup,
            new MyEventProcessor(),
            blobCheckpointStore
        );
        */

        Console.WriteLine("Starting Event Hub processor...");
        processor.ProcessEventAsync += args => Task.FromResult(((MyEventProcessor)processor.Processor).ProcessEventAsync(args));
        processor.ProcessErrorAsync += args => Task.FromResult(((MyEventProcessor)processor.Processor).ProcessErrorAsync(args));

        try
        {
            await processor.StartProcessingAsync();
            Console.WriteLine("Processor started. Press any key to stop.");
            Console.ReadKey();
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed to start processor: {ex.Message}");
        }
        finally
        {
            Console.WriteLine("Stopping Event Hub processor...");
            await processor.StopProcessingAsync();
            Console.WriteLine("Processor stopped.");
        }
    }
}
        

Customizing the Processor

Checkpointing

The UpdateCheckpointAsync() method is crucial. Call it after you have successfully processed an event. The processor library uses this to track progress and resume from the correct point if the application restarts.

Load Balancing

The processor library automatically handles load balancing across multiple instances of your application consuming from the same Event Hub and consumer group. It leverages the checkpoint store to coordinate.

Error Handling

Implement the ProcessErrorAsync method to catch and handle errors that occur during event processing. This is vital for ensuring the reliability of your consumer.

Partition Management

The InitializePartitionAsync and ClosePartitionAsync methods allow you to perform setup and cleanup operations specific to each partition being processed by an instance of your handler.

Next Steps