Azure Event Hubs Docs

Processing Events from Azure Event Hubs

This tutorial guides you through the process of consuming and processing events from an Azure Event Hub using a common pattern. We'll cover setting up your environment, writing code to read events, and basic processing logic.

Prerequisites:

1. Setting Up Your Environment

Before you start coding, ensure you have the necessary Azure resources and tools configured.

1.1 Create an Event Hub

If you haven't already, create an Event Hub. You can do this via the Azure portal, Azure CLI, or Azure PowerShell.

Using Azure CLI:

az eventhubs namespace create --name --resource-group --location az eventhubs create --name --namespace-name --resource-group --partition-count 2 --message-retention-in-days 1

1.2 Get Connection String

You'll need a connection string to connect your application to the Event Hub. Obtain a Listen key connection string.

Using Azure CLI:

az eventhubs authorization-rule list --namespace-name --resource-group --event-hub-name --query "[?KeyName=='Listen'].primaryConnectionString" -o tsv

Store this connection string securely, for example, in environment variables or Azure Key Vault.

2. Event Processing Patterns

There are several ways to process events. A common and robust pattern is using the Event Hubs Consumer Library. This library simplifies reading events from Event Hubs by managing checkpoints, partition distribution, and error handling.

2.1 Using the Event Hubs Consumer Library (Conceptual)

The consumer library works by creating consumer groups. Each consumer group allows multiple applications or instances of an application to read from an Event Hub independently, without interfering with each other. Each consumer in a group keeps track of its progress using checkpoints.

3. Implementing Event Processing

Let's look at examples in C# and Python.

3.1 C# Example

First, install the necessary NuGet package:

dotnet add package Azure.Messaging.EventHubs.Consumer

Then, implement the consumer logic:


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;

public class EventProcessor
{
    private readonly string _eventHubConnectionString;
    private readonly string _eventHubName;
    private readonly string _consumerGroupName;

    public EventProcessor(string connectionString, string eventHubName, string consumerGroupName)
    {
        _eventHubConnectionString = connectionString;
        _eventHubName = eventHubName;
        _consumerGroupName = consumerGroupName;
    }

    public async Task StartProcessingAsync()
    {
        await using var client = new EventHubConsumerClient(_consumerGroupName, _eventHubConnectionString, _eventHubName);

        Console.WriteLine("Starting to read events...");

        await foreach (PartitionEvent partitionEvent in client.ReadEventsAsync())
        {
            try
            {
                string messageBody = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
                Console.WriteLine($"Received event: '{messageBody}' from partition {partitionEvent.Partition.Id}");

                // TODO: Implement your event processing logic here
                // e.g., save to database, trigger another service, perform analysis

                // If processing is successful, the library will automatically checkpoint
                // if configured to do so. For manual checkpointing, use:
                // await client.UpdateCheckpointAsync(partitionEvent);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error processing event: {ex.Message}");
                // Implement error handling and retry logic as needed
            }
        }
    }
}

// In your Main method:
// var processor = new EventProcessor("", "", "$Default");
// await processor.StartProcessingAsync();
        

3.2 Python Example

First, install the necessary library:

pip install azure-eventhub

Then, implement the consumer logic:


import asyncio
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
import os

EVENTHUB_CONNECTION_STR = os.environ.get("EVENTHUB_CONNECTION_STR")
EVENTHUB_NAME = os.environ.get("EVENTHUB_NAME")
CONSUMER_GROUP_NAME = "$Default" # Or your custom consumer group name

# Optional: For checkpointing with Azure Blob Storage
# AZURE_STORAGE_CONNECTION_STR = os.environ.get("AZURE_STORAGE_CONNECTION_STR")
# CONTAINER_NAME = "eventhub-checkpoints"

async def process_event(event):
    """
    Processes a single event.
    Replace this with your actual event processing logic.
    """
    try:
        message_body = event.body_event.decode("utf-8")
        print(f"Received event: '{message_body}' from partition {event.partition_id}")
        # TODO: Implement your event processing logic here
        # e.g., save to database, trigger another service, perform analysis
    except Exception as e:
        print(f"Error processing event: {e}")
        raise # Re-raise to allow checkpointing to potentially fail if configured

async def receive_events():
    """
    Receives and processes events from an Event Hub.
    """
    # If using BlobCheckpointStore:
    # checkpoint_store = BlobCheckpointStore.from_connection_string(
    #     AZURE_STORAGE_CONNECTION_STR,
    #     CONTAINER_NAME
    # )
    # consumer_client = EventHubConsumerClient.from_connection_string(
    #     EVENTHUB_CONNECTION_STR,
    #     consumer_group=CONSUMER_GROUP_NAME,
    #     eventhub_name=EVENTHUB_NAME,
    #     checkpoint_store=checkpoint_store
    # )

    # If not using checkpointing (not recommended for production):
    consumer_client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP_NAME,
        eventhub_name=EVENTHUB_NAME
    )

    print("Starting to read events...")

    async with consumer_client:
        await consumer_client.subscribe(
            handlers=process_event,
            partition_ids=await consumer_client.get_partition_ids() # Process all partitions
        )
        # Keep the program running to continue receiving events
        await asyncio.Event().wait()

# To run:
# if __name__ == "__main__":
#     asyncio.run(receive_events())
        

4. Best Practices

Security: Never hardcode connection strings in your code. Use environment variables, Azure Key Vault, or managed identities for secure access.

5. Next Steps