Receiving Messages from Azure Event Hubs

On this page

Introduction

Receiving messages from Azure Event Hubs is a fundamental operation for any application that consumes event streams. This guide covers the essential concepts and methods for reliably reading events from your Event Hubs, ensuring you can process them efficiently and without data loss.

Key takeaway: Proper handling of message reception involves understanding consumer groups and choosing the right SDK or service integration.

Understanding Consumer Groups

Consumer groups are a core concept in Event Hubs that allow multiple applications or different parts of the same application to independently consume the data in an Event Hub. Each consumer group maintains its own progress (offset) through the event stream. This means that multiple applications can read the same events without interfering with each other. For instance, one consumer group might be used for real-time processing, while another is used for batch analytics.

When you create an Event Hub, a default consumer group named $Default is automatically created. You can create additional consumer groups via the Azure portal, Azure CLI, or SDKs to cater to different consumption needs.

Receiving Strategies

Azure Event Hubs offers several ways to receive messages, each suited for different scenarios and development preferences.

Using EventProcessorHost (Legacy)

The EventProcessorHost was the primary way to receive events using the older Microsoft.Azure.EventHubs SDK. While still functional for existing applications, it's recommended to migrate to the newer SDKs for new development.

Using Azure Functions

Azure Functions provide a serverless compute experience that can natively integrate with Event Hubs. The Event Hubs trigger for Azure Functions simplifies message reception by handling checkpointing, scaling, and error management automatically. This is an excellent choice for event-driven architectures where you want to run code in response to incoming events without managing infrastructure.

To use the Event Hubs trigger:

Using Azure.Messaging.EventHubs.Processor (Recommended)

The Azure.Messaging.EventHubs.Processor library (part of the Azure.Messaging.EventHubs SDK) is the modern, recommended approach for building robust event receivers. It provides a high-level abstraction for managing the complexities of reading from Event Hubs, including:

This library is designed for scenarios where you need more control over the receiving process than Azure Functions offer, or when building custom microservices that consume events.

Note: Always use a dedicated consumer group for your processor instances to avoid conflicts with other applications.

Example (C# with Event Hubs Processor)

Here's a basic example demonstrating how to set up an event processor to receive messages using the Azure.Messaging.EventHubs.Processor library.

// Install NuGet package: Azure.Messaging.EventHubs.Processor

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;

public class EventHubReceiver
{
    private readonly string eventHubConnectionString;
    private readonly string consumerGroup;
    private readonly string blobStorageConnectionString;
    private readonly string blobContainerName;

    public EventHubReceiver(string ehConnectionString, string consumerGroupName, string storageConnectionString, string containerName)
    {
        eventHubConnectionString = ehConnectionString;
        consumerGroup = consumerGroupName;
        blobStorageConnectionString = storageConnectionString;
        blobContainerName = containerName;
    }

    public async Task StartProcessingAsync()
    {
        var processorOptions = new EventProcessorClientOptions
        {
            // Configure retry options if needed
            RetryOptions = new Azure.Core.RetryOptions
            {
                MaxRetries = 5,
                Delay = TimeSpan.FromSeconds(5)
            }
        };

        // Create BlobContainerClient for checkpointing
        var blobClient = new BlobServiceClient(blobStorageConnectionString);
        var containerClient = blobClient.GetBlobContainerClient(blobContainerName);

        var processor = new EventProcessorClient(
            containerClient,
            consumerGroup,
            eventHubConnectionString,
            null, // Event Hub name can be specified here or in the connection string
            processorOptions);

        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        Console.WriteLine("Starting Event Hub processor...");
        await processor.StartProcessingAsync();

        // Keep the application running to continue processing events
        Console.WriteLine("Press Ctrl+C to stop the processor.");
        await Task.Delay(Timeout.Infinite);

        Console.WriteLine("Stopping Event Hub processor...");
        await processor.StopProcessingAsync();
    }

    private async Task ProcessEventHandler(ProcessEventArgs arg)
    {
        Console.WriteLine($"\tReceived event: '{arg.Data.EventBody.ToString()}'");
        Console.WriteLine($"\tEnqueued Time: {arg.Data.EnqueuedTime}");
        Console.WriteLine($"\tPartition ID: {arg.Partition.Id}");

        // Update checkpoint after successfully processing an event
        await arg.UpdateCheckpointAsync(arg.CancellationToken);
        Console.WriteLine("Checkpoint updated.");
    }

    private Task ProcessErrorHandler(ProcessErrorEventArgs arg)
    {
        Console.WriteLine($"Error in processor. Partition: '{arg.PartitionId ?? "-"}', Offset: '{arg.Offset ?? -1}'. Exception: {arg.Exception}");
        return Task.CompletedTask;
    }

    public static async Task Main(string[] args)
    {
        // Replace with your actual connection strings and container name
        var eventHubConnectionString = Environment.GetEnvironmentVariable("EVENTHUB_CONNECTION_STRING");
        var blobStorageConnectionString = Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING");
        var blobContainerName = "eventhub-checkpoints"; // Your blob container name for checkpoints
        var consumerGroup = "$Default"; // Or your custom consumer group name

        if (string.IsNullOrEmpty(eventHubConnectionString) || string.IsNullOrEmpty(blobStorageConnectionString))
        {
            Console.WriteLine("Please set EVENTHUB_CONNECTION_STRING and BLOB_STORAGE_CONNECTION_STRING environment variables.");
            return;
        }

        var receiver = new EventHubReceiver(eventHubConnectionString, consumerGroup, blobStorageConnectionString, blobContainerName);
        await receiver.StartProcessingAsync();
    }
}

Example (Python with Event Hubs Processor)

Here's a Python equivalent using the azure-eventhubs-checkpointstoreblob and azure-eventhubs libraries.


import asyncio
import os
from azure.eventhubslib.aio import EventHubClient
from azure.eventhubslib.checkpointstores.blobcheckpointstore import BlobCheckpointStore
from azure.storage.blob.aio import BlobServiceClient

async def receive_events(eventhub_conn_str, consumer_group, storage_conn_str, container_name):
    """
    Receives events from an Event Hub using a BlobCheckpointStore.
    """
    print("Initializing BlobCheckpointStore...")
    blob_service_client = BlobServiceClient.from_connection_string(storage_conn_str)
    checkpoint_store = BlobCheckpointStore(blob_service_client, container_name)

    print("Creating EventHubClient...")
    client = EventHubClient.from_connection_string(
        eventhub_conn_str,
        EventHubClient.DEFAULT_SCOPE, # Default scope for Event Hubs
        consumer_group=consumer_group
    )

    print(f"Starting to receive events with consumer group: {consumer_group}...")
    async def on_event(partition_context, event):
        print(f"Received event: {event.body_as_str()} from partition {partition_context.partition_id}")
        # Update checkpoint after processing
        await partition_context.update_checkpoint(event)
        print(f"Checkpoint updated for partition {partition_context.partition_id} at offset {event.offset}")

    async def on_error(partition_context, error):
        print(f"Error processing partition {partition_context.partition_id}: {error}")

    try:
        async with client:
            receiver = client.create_consumer(
                consumer_group,
                checkpoint_store=checkpoint_store,
                auto_reconnect=True,
                on_event=on_event,
                on_error=on_error
            )
            async with receiver:
                await receiver.run() # This will run indefinitely until interrupted

    except KeyboardInterrupt:
        print("Stopping receiver...")
    finally:
        print("Receiver stopped.")

async def main():
    eventhub_conn_str = os.environ.get("EVENTHUB_CONNECTION_STRING")
    storage_conn_str = os.environ.get("BLOB_STORAGE_CONNECTION_STRING")
    container_name = "eventhub-checkpoints" # Your blob container name for checkpoints
    consumer_group = "$Default" # Or your custom consumer group name

    if not eventhub_conn_str or not storage_conn_str:
        print("Please set EVENTHUB_CONNECTION_STRING and BLOB_STORAGE_CONNECTION_STRING environment variables.")
        return

    await receive_events(eventhub_conn_str, consumer_group, storage_conn_str, container_name)

if __name__ == "__main__":
    asyncio.run(main())

Remember to install the necessary packages:


pip install azure-eventhubs azure-eventhubs-checkpointstoreblob azure-storage-blob
        

Best Practices for Receiving