Receiving Events

This guide explains how to receive events from Azure Event Hubs using various SDKs and tools. Understanding how to consume events is crucial for building event-driven applications.

Core Concepts

Receiving events typically involves connecting to an Event Hub using a Consumer Group. Consumer groups allow multiple applications to independently consume events from the same Event Hub without interfering with each other.

Consumer Groups Explained

Each Event Hub has a default consumer group named $Default. You can create additional consumer groups to cater to different application needs. For example, one consumer group might be used for real-time processing, while another is used for batch analytics.

Receiving Events with .NET SDK

The Azure SDK for .NET provides robust classes for receiving events. The primary class for this is EventProcessorClient.

Example: Basic Event Receiving Loop

// Install the package:
// dotnet add package Azure.Messaging.EventHubs.Processor

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;

public class EventReceiver
{
    private const string eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
    private const string eventHubName = "YOUR_EVENT_HUB_NAME";
    private const string consumerGroup = "$Default"; // Or your custom consumer group
    private const string blobStorageConnectionString = "YOUR_BLOB_STORAGE_CONNECTION_STRING";
    private const string blobContainerName = "eventprocessor-checkpoints";

    public static async Task Main(string[] args)
    {
        var storageClient = new BlobServiceClient(blobStorageConnectionString);
        var blobContainerClient = storageClient.GetBlobContainerClient(blobContainerName);

        var processorOptions = new EventProcessorClientOptions
        {
            // Configure options as needed, e.g., RetryOptions, LoadBalancingOptions
        };

        var processor = new EventProcessorClient(
            blobContainerClient,
            consumerGroup,
            eventHubConnectionString,
            eventHubName,
            processorOptions);

        // Register handlers for processing events and errors
        processor.ProcessEventAsync += ProcessEventHandler;
        processor.ProcessErrorAsync += ProcessErrorHandler;

        Console.WriteLine("Starting Event Processor...");
        await processor.StartProcessingAsync();

        Console.WriteLine("Press Enter to stop the processor...");
        Console.ReadLine();

        Console.WriteLine("Stopping Event Processor...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Event Processor stopped.");
    }

    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Access the event data
        Console.WriteLine($"\tReceived event: PartitionId={eventArgs.Partition.PartitionId}, Offset={eventArgs.Data.Offset}, SequenceNumber={eventArgs.Data.SequenceNumber}");
        string messageBody = Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray());
        Console.WriteLine($"\tMessage body: {messageBody}");

        // You can also access event properties like EnqueuedTime, Properties, etc.
        // Console.WriteLine($"\tEnqueued Time: {eventArgs.Data.EnqueuedTime}");

        // Complete the event to mark it as processed.
        // If not completed, it will be re-delivered.
        await eventArgs.CompleteAsync();
    }

    static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        Console.WriteLine($"\tError in processor: {eventArgs.FullyQualifiedNamespace}, {eventArgs.EntityPath}, {eventArgs.PartitionId}, {eventArgs.Reason}");
        // Handle specific error types if needed
        return Task.CompletedTask;
    }
}
Note: Ensure you have the necessary Azure SDK packages installed for your project. Replace the placeholder connection strings and names with your actual Azure Event Hubs and Azure Blob Storage credentials. Blob storage is used by the processor for checkpointing.

Receiving Events with Python SDK

The Python SDK offers similar capabilities for event consumption.

Example: Asynchronous Event Consumption

# pip install azure-eventhubsprotocol
        # pip install azure-eventhubsprotocol[aiohttp]

        import asyncio
        from azure.eventhubsprotocol.aio import EventHubConsumerClient

        EVENTHUB_CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
        EVENTHUB_NAME = "YOUR_EVENT_HUB_NAME"
        CONSUMER_GROUP = "$Default" # Or your custom consumer group

        async def process_event(event):
            print(f"Received event: {event}")
            # Access event data: event.body, event.properties, event.offset, event.sequence_number

        async def main():
            consumer_client = EventHubConsumerClient.from_connection_string(
                EVENTHUB_CONNECTION_STR,
                consumer_group=CONSUMER_GROUP,
                event_hub_name=EVENTHUB_NAME
            )

            print("Starting Event Hub consumer...")
            async with consumer_client:
                await consumer_client.subscribe(
                    process_event,
                    partition_id="0" # Specify partition IDs to receive from, or omit for all
                )
                # To receive from all partitions, you'd typically manage partition discovery
                # or subscribe to each partition explicitly.
                # For simplicity, this example shows a single partition.
                print("Listening for events. Press Ctrl+C to stop.")
                await asyncio.Future() # Run forever until interrupted

        if __name__ == "__main__":
            try:
                asyncio.run(main())
            except KeyboardInterrupt:
                print("Consumer stopped.")
        

Key Considerations

Further Reading

Azure Event Hubs Consumer Groups

Learn more about the role and management of consumer groups.

Event Processor Host (.NET)

Deep dive into the .NET Event Processor Host for reliable event processing.