Receiving Data from Azure Event Hubs

This tutorial will guide you through the process of building an application that consumes messages from an Azure Event Hub. We'll cover setting up your environment, connecting to your Event Hub, and processing incoming events efficiently.

Prerequisites

  • An active Azure subscription.
  • An Azure Event Hubs namespace and an Event Hub created within it.
  • Connection string for your Event Hub.
  • Basic understanding of C# or Python (examples provided in both).
  • .NET SDK or Python 3.7+ installed.

Step 1: Set up your Project

Create a new project for your consumer application. We'll demonstrate with both C# and Python.

C# (.NET)

Using the .NET CLI, create a new console application:

dotnet new console -n EventHubsConsumer
cd EventHubsConsumer

Add the Azure Event Hubs SDK package:

dotnet add package Azure.Messaging.EventHubs

Python

Create a new directory and install the necessary library:

mkdir eventhubs-consumer
cd eventhubs-consumer
pip install azure-eventhub

Step 2: Connect to your Event Hub

You'll need your Event Hub connection string. This can be found in the Azure portal under your Event Hubs namespace's "Shared access policies".

C# (.NET)

In your Program.cs file, initialize the EventHubConsumerClient:

using Azure.Messaging.EventHubs;
using System;
using System.Text;
using System.Threading.Tasks;

// Replace with your actual connection string and event hub name
string eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";

var client = new EventHubConsumerClient(
    EventHubConsumerClient.DefaultConsumerGroupName,
    eventHubConnectionString,
    eventHubName);

Console.WriteLine("Listening for events...");

Python

Create a consumer.py file and add the following:

from azure.eventhub import EventHubConsumerClient
import os

# Replace with your actual connection string and event hub name
event_hub_connection_str = "YOUR_EVENT_HUB_CONNECTION_STRING"
event_hub_name = "YOUR_EVENT_HUB_NAME"

consumer_client = EventHubConsumerClient.from_connection_string(
    event_hub_connection_str,
    consumer_group="$Default",  # Use the default consumer group
    event_hub_name=event_hub_name
)

print("Listening for events...")

Step 3: Receive and Process Events

Now, let's set up a loop to continuously receive and process events.

C# (.NET)

Add the following code to Program.cs:

async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        string messageBody = Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray());
        Console.WriteLine($"Received event: {messageBody} from partition {eventArgs.Partition.Id}");

        // Process the event data here...
        // For example, you could log it, send it to another service, etc.

        await eventArgs.UpdateCheckpointAsync(); // Mark event as processed
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error processing event: {ex.Message}");
    }
}

async Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
    Console.WriteLine($"Error processing event: {eventArgs.Exception.Message}");
}

using var cts = new System.Threading.CancellationTokenSource();
client.PartitionInitializingAsync += (args) =>
{
    Console.WriteLine($"Partition initializing: {args.PartitionId}");
    return Task.CompletedTask;
};

client.PartitionClosingAsync += (args) =>
{
    Console.WriteLine($"Partition closing: {args.PartitionId}");
    return Task.CompletedTask;
};

await client.StartProcessingEventsAsync(
    ProcessEventHandler,
    ProcessErrorHandler,
    cts.Token);

Console.WriteLine("Press Ctrl+C to stop.");
await Task.Delay(-1, cts.Token); // Keep running until cancelled

Python

Complete your consumer.py file:

def on_event(partition_context, event):
    message = event.body_bytes.decode("utf-8")
    print(f"Received event: {message} from partition {partition_context.partition_id}")

    # Process the event data here...
    # For example, you could log it, send it to another service, etc.

    partition_context.update_checkpoint(event) # Mark event as processed

def on_error(partition_context, error):
    print(f"Error receiving event: {error}")

try:
    consumer_client.receive_batch(on_event=on_event, on_error=on_error)
except KeyboardInterrupt:
    print("Stopping consumer.")
finally:
    consumer_client.close()

Step 4: Run your Consumer

C# (.NET)

Run your application from the terminal:

dotnet run

Python

Run your script:

python consumer.py

Once running, messages sent to your Event Hub will be received and processed by your application. Remember to replace placeholder connection strings and names with your actual Azure Event Hub details.

Next: Sending to Azure Event Hubs