Azure Event Hubs Docs

Receiving Messages from Azure Event Hubs

This tutorial guides you through the process of receiving messages from an Azure Event Hub using a consumer application. We'll cover setting up a consumer, connecting to your Event Hub, and processing incoming messages.

Prerequisites

1. Setting up your Consumer Application

You can use various SDKs to build your consumer application. Here, we provide examples using C# (.NET) and Python.

2. C# Consumer Example (using Azure.Messaging.EventHubs)

This example demonstrates how to create a simple console application that connects to an Event Hub and starts listening for messages.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;

public class EventHubConsumer
{
    // Replace with your Event Hub connection string and consumer group name
    private const string EventHubConnectionString = "";
    private const string EventHubName = "";
    private const string ConsumerGroup = "$Default"; // Or your custom consumer group

    public static async Task Main(string[] args)
    {
        await using var consumer = new EventProcessorClient(
            new EventProcessorClientOptions {
                ConsumerGroup = ConsumerGroup
            },
            new Azure.Messaging.EventHubs.EventHubConsumerClient(
                EventHubConnectionString,
                EventHubName
            )
        );

        Console.WriteLine($"Starting consumer for Event Hub '{EventHubName}' in consumer group '{ConsumerGroup}'...");

        consumer.ProcessEvent += (Partition, Event, cancellationToken) => {
            string messageBody = Encoding.UTF8.GetString(Event.Data.Array, Event.Data.Offset, Event.Data.Count);
            Console.WriteLine($"Received message: {messageBody} from partition {Partition.PartitionId}");
            return Task.CompletedTask;
        };

        consumer.ProcessError += (Partition, Error, cancellationToken) => {
            Console.WriteLine($"Error processing partition {Partition.PartitionId}: {Error.Reason}");
            return Task.CompletedTask;
        };

        try
        {
            await consumer.StartProcessingAsync();
            Console.WriteLine("Press Enter to stop receiving.");
            Console.ReadLine();
        }
        finally
        {
            await consumer.StopProcessingAsync();
            Console.WriteLine("Consumer stopped.");
        }
    }
}
            

Explanation:

Important: Ensure you replace the placeholder connection string and Event Hub name with your actual credentials. For production environments, consider using Azure Key Vault to manage secrets securely.

3. Python Consumer Example (using azure-eventhub)

Here's a Python script to receive messages:


import asyncio
from azure.eventhub.aio import EventHubConsumerClient

# Replace with your Event Hub connection string and consumer group name
EVENTHUB_CONNECTION_STR = ""
EVENTHUB_NAME = ""
CONSUMER_GROUP = "$Default"  # Or your custom consumer group

async def process_event(partition_context, event):
    """
    Processes a single event.
    """
    print(f"Received message: {event.body_str} from partition {partition_context.partition_id}")
    # In a real application, you'd update checkpoints here to mark processed events.
    # await partition_context.update_checkpoint(event)

async def process_error(partition_context, error):
    """
    Processes errors that occur during event processing.
    """
    print(f"Error processing partition {partition_context.partition_id}: {error}")

async def main():
    client = EventHubConsumerClient.from_connection_string(
        EVENTHUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        event_hub_name=EVENTHUB_NAME
    )

    print(f"Starting consumer for Event Hub '{EVENTHUB_NAME}' in consumer group '{CONSUMER_GROUP}'...")

    async with client:
        await client.receive(
            on_event=process_event,
            on_error=process_error
        )

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Consumer stopped by user.")
            

Explanation:

Note: The Python SDK relies on asyncio. Ensure your environment supports it and that you install the SDK using pip install azure-eventhub[azure-identity].

4. Running the Consumer Application

  1. Replace the placeholder connection string and Event Hub name in the code with your actual details.
  2. Compile and run the C# application using the .NET CLI:
    
    dotnet run
                        
  3. Run the Python script:
    
    python your_consumer_script.py
                        

Once the consumer is running, any messages sent to your Event Hub will be received and printed to the console.

5. Key Concepts for Receiving Messages

Next Steps