Azure Event Hubs Developers Guide

Receiving Data from Azure Event Hubs

This section details how to consume events from an Azure Event Hub. We'll cover different SDKs and common patterns for efficient data retrieval.

Introduction to Event Consumption

Receiving data from Event Hubs involves connecting to a specific consumer group and processing the stream of events. Event Hubs supports multiple consumer groups, allowing different applications to read from the same hub independently.

Using the Azure SDKs

Azure provides robust SDKs for various languages. Here, we'll focus on common patterns using C# and Python.

C# Example: Reading Events with EventProcessorClient

The EventProcessorClient is the recommended way to process events in a distributed and fault-tolerant manner. It handles checkpointing and load balancing automatically.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using System;
using System.Text;
using System.Threading.Tasks;

public class EventProcessor
{
    private const string EH_CONNECTION_STRING = "";
    private const string EVENT_HUB_NAME = "";
    private const string CONSUMER_GROUP_NAME = "$Default"; // Or your custom consumer group
    private const string STORAGE_CONNECTION_STRING = "";
    private const string BLOB_CONTAINER_NAME = "";

    public static async Task RunProcessorAsync()
    {
        var storageClient = new Azure.Storage.Blobs.BlobServiceClient(STORAGE_CONNECTION_STRING);
        var processor = new EventProcessorClient(
            storageClient.GetBlobContainerClient(BLOB_CONTAINER_NAME),
            CONSUMER_GROUP_NAME,
            EH_CONNECTION_STRING,
            EVENT_HUB_NAME);

        // Register handlers for processing events and potential errors
        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        Console.WriteLine("Starting Event Processor...");
        await processor.StartProcessingAsync();

        Console.WriteLine("Press ENTER to stop the processor.");
        Console.ReadLine();

        await processor.StopProcessingAsync();
        Console.WriteLine("Event Processor stopped.");
    }

    static async Task ProcessEventHandler(ProcessEventArgs args)
    {
        // Access the event data
        string messageBody = Encoding.UTF8.GetString(args.Data.EventBody.ToArray());
        Console.WriteLine($"\tReceived message: {messageBody}");
        Console.WriteLine($"\tPartitionId: {args.PartitionId}, Sequence Number: {args.Data.SequenceNumber}");

        // Update the checkpoint for the current partition
        // This indicates that events up to this sequence number have been successfully processed.
        await args.UpdateCheckpointAsync();
    }

    static Task ProcessErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"\tERROR: PartitionId='{args.PartitionId}', ConsumerGroupName='{args.ConsumerGroupName}'.");
        Console.WriteLine($"\t{args.Exception.Message}");
        return Task.CompletedTask;
    }
}
            

Important

For EventProcessorClient to work correctly, you must configure blob storage for checkpointing. This ensures that your application can resume processing from where it left off if it restarts.

Python Example: Reading Events with EventHubConsumerClient

The EventHubConsumerClient in Python provides a similar capability for consuming events.


import os
import asyncio
from azure.eventhub import EventHubConsumerClient

EVENTHUB_CONNECTION_STR = os.environ.get("EVENTHUB_CONNECTION_STR")
EVENTHUB_NAME = os.environ.get("EVENTHUB_NAME")
CONSUMER_GROUP = "$Default" # Or your custom consumer group

async def process_event(event):
    print(f"Received event: {event.body_as_str()}")
    print(f"Partition ID: {event.partition_id}, Sequence Number: {event.sequence_number}")

async def run_consumer():
    consumer_client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENTHUB_NAME
    )
    async with consumer_client:
        await consumer_client.receive(
            on_event=process_event,
            starting_position="-1" # -1 for earliest event, or use specific offset/timestamp
        )

if __name__ == "__main__":
    print("Starting Event Consumer...")
    try:
        asyncio.run(run_consumer())
    except KeyboardInterrupt:
        print("Consumer stopped.")
            

Note

The starting_position parameter in Python's receive method allows you to control where the consumer starts reading. Using "-1" starts from the oldest available event.

Understanding Consumer Groups

Consumer groups are essential for scaling and isolating event consumption. Each consumer group maintains its own read pointer (offset) into the Event Hub partition's event stream.

Checkpointing for Reliability

Checkpointing is the mechanism by which a consumer application records its progress in reading events from a partition. This is crucial for fault tolerance. If the consumer restarts, it can resume reading from the last recorded checkpoint, avoiding reprocessing or data loss.

Both the C# EventProcessorClient and the Python EventHubConsumerClient (when configured with appropriate storage or other state management) handle checkpointing automatically.

Advanced Scenarios

For more complex scenarios, consider: