Receiving Data from Azure Event Hubs
This tutorial will guide you through the process of building an application that consumes messages from an Azure Event Hub. We'll cover setting up your environment, connecting to your Event Hub, and processing incoming events efficiently.
Prerequisites
- An active Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created within it.
- Connection string for your Event Hub.
- Basic understanding of C# or Python (examples provided in both).
- .NET SDK or Python 3.7+ installed.
Step 1: Set up your Project
Create a new project for your consumer application. We'll demonstrate with both C# and Python.
C# (.NET)
Using the .NET CLI, create a new console application:
dotnet new console -n EventHubsConsumer
cd EventHubsConsumer
Add the Azure Event Hubs SDK package:
dotnet add package Azure.Messaging.EventHubs
Python
Create a new directory and install the necessary library:
mkdir eventhubs-consumer
cd eventhubs-consumer
pip install azure-eventhub
Step 2: Connect to your Event Hub
You'll need your Event Hub connection string. This can be found in the Azure portal under your Event Hubs namespace's "Shared access policies".
C# (.NET)
In your Program.cs file, initialize the EventHubConsumerClient:
using Azure.Messaging.EventHubs;
using System;
using System.Text;
using System.Threading.Tasks;
// Replace with your actual connection string and event hub name
string eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
var client = new EventHubConsumerClient(
EventHubConsumerClient.DefaultConsumerGroupName,
eventHubConnectionString,
eventHubName);
Console.WriteLine("Listening for events...");
Python
Create a consumer.py file and add the following:
from azure.eventhub import EventHubConsumerClient
import os
# Replace with your actual connection string and event hub name
event_hub_connection_str = "YOUR_EVENT_HUB_CONNECTION_STRING"
event_hub_name = "YOUR_EVENT_HUB_NAME"
consumer_client = EventHubConsumerClient.from_connection_string(
event_hub_connection_str,
consumer_group="$Default", # Use the default consumer group
event_hub_name=event_hub_name
)
print("Listening for events...")
Step 3: Receive and Process Events
Now, let's set up a loop to continuously receive and process events.
C# (.NET)
Add the following code to Program.cs:
async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
try
{
string messageBody = Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray());
Console.WriteLine($"Received event: {messageBody} from partition {eventArgs.Partition.Id}");
// Process the event data here...
// For example, you could log it, send it to another service, etc.
await eventArgs.UpdateCheckpointAsync(); // Mark event as processed
}
catch (Exception ex)
{
Console.WriteLine($"Error processing event: {ex.Message}");
}
}
async Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"Error processing event: {eventArgs.Exception.Message}");
}
using var cts = new System.Threading.CancellationTokenSource();
client.PartitionInitializingAsync += (args) =>
{
Console.WriteLine($"Partition initializing: {args.PartitionId}");
return Task.CompletedTask;
};
client.PartitionClosingAsync += (args) =>
{
Console.WriteLine($"Partition closing: {args.PartitionId}");
return Task.CompletedTask;
};
await client.StartProcessingEventsAsync(
ProcessEventHandler,
ProcessErrorHandler,
cts.Token);
Console.WriteLine("Press Ctrl+C to stop.");
await Task.Delay(-1, cts.Token); // Keep running until cancelled
Python
Complete your consumer.py file:
def on_event(partition_context, event):
message = event.body_bytes.decode("utf-8")
print(f"Received event: {message} from partition {partition_context.partition_id}")
# Process the event data here...
# For example, you could log it, send it to another service, etc.
partition_context.update_checkpoint(event) # Mark event as processed
def on_error(partition_context, error):
print(f"Error receiving event: {error}")
try:
consumer_client.receive_batch(on_event=on_event, on_error=on_error)
except KeyboardInterrupt:
print("Stopping consumer.")
finally:
consumer_client.close()
Step 4: Run your Consumer
C# (.NET)
Run your application from the terminal:
dotnet run
Python
Run your script:
python consumer.py
Once running, messages sent to your Event Hub will be received and processed by your application. Remember to replace placeholder connection strings and names with your actual Azure Event Hub details.
Next: Sending to Azure Event Hubs