Receiving Data from Azure Event Hubs
This section details how to consume events from an Azure Event Hub. We'll cover different SDKs and common patterns for efficient data retrieval.
Introduction to Event Consumption
Receiving data from Event Hubs involves connecting to a specific consumer group and processing the stream of events. Event Hubs supports multiple consumer groups, allowing different applications to read from the same hub independently.
Using the Azure SDKs
Azure provides robust SDKs for various languages. Here, we'll focus on common patterns using C# and Python.
C# Example: Reading Events with EventProcessorClient
The EventProcessorClient is the recommended way to process events in a distributed and fault-tolerant manner. It handles checkpointing and load balancing automatically.
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventProcessor
{
private const string EH_CONNECTION_STRING = "";
private const string EVENT_HUB_NAME = "";
private const string CONSUMER_GROUP_NAME = "$Default"; // Or your custom consumer group
private const string STORAGE_CONNECTION_STRING = "";
private const string BLOB_CONTAINER_NAME = "";
public static async Task RunProcessorAsync()
{
var storageClient = new Azure.Storage.Blobs.BlobServiceClient(STORAGE_CONNECTION_STRING);
var processor = new EventProcessorClient(
storageClient.GetBlobContainerClient(BLOB_CONTAINER_NAME),
CONSUMER_GROUP_NAME,
EH_CONNECTION_STRING,
EVENT_HUB_NAME);
// Register handlers for processing events and potential errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
Console.WriteLine("Starting Event Processor...");
await processor.StartProcessingAsync();
Console.WriteLine("Press ENTER to stop the processor.");
Console.ReadLine();
await processor.StopProcessingAsync();
Console.WriteLine("Event Processor stopped.");
}
static async Task ProcessEventHandler(ProcessEventArgs args)
{
// Access the event data
string messageBody = Encoding.UTF8.GetString(args.Data.EventBody.ToArray());
Console.WriteLine($"\tReceived message: {messageBody}");
Console.WriteLine($"\tPartitionId: {args.PartitionId}, Sequence Number: {args.Data.SequenceNumber}");
// Update the checkpoint for the current partition
// This indicates that events up to this sequence number have been successfully processed.
await args.UpdateCheckpointAsync();
}
static Task ProcessErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"\tERROR: PartitionId='{args.PartitionId}', ConsumerGroupName='{args.ConsumerGroupName}'.");
Console.WriteLine($"\t{args.Exception.Message}");
return Task.CompletedTask;
}
}
Important
For EventProcessorClient to work correctly, you must configure blob storage for checkpointing. This ensures that your application can resume processing from where it left off if it restarts.
Python Example: Reading Events with EventHubConsumerClient
The EventHubConsumerClient in Python provides a similar capability for consuming events.
import os
import asyncio
from azure.eventhub import EventHubConsumerClient
EVENTHUB_CONNECTION_STR = os.environ.get("EVENTHUB_CONNECTION_STR")
EVENTHUB_NAME = os.environ.get("EVENTHUB_NAME")
CONSUMER_GROUP = "$Default" # Or your custom consumer group
async def process_event(event):
print(f"Received event: {event.body_as_str()}")
print(f"Partition ID: {event.partition_id}, Sequence Number: {event.sequence_number}")
async def run_consumer():
consumer_client = EventHubConsumerClient.from_connection_string(
EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENTHUB_NAME
)
async with consumer_client:
await consumer_client.receive(
on_event=process_event,
starting_position="-1" # -1 for earliest event, or use specific offset/timestamp
)
if __name__ == "__main__":
print("Starting Event Consumer...")
try:
asyncio.run(run_consumer())
except KeyboardInterrupt:
print("Consumer stopped.")
Note
The starting_position parameter in Python's receive method allows you to control where the consumer starts reading. Using "-1" starts from the oldest available event.
Understanding Consumer Groups
Consumer groups are essential for scaling and isolating event consumption. Each consumer group maintains its own read pointer (offset) into the Event Hub partition's event stream.
- $Default: The default consumer group. If you don't specify a consumer group, this one is used.
- Custom Consumer Groups: Create custom consumer groups for different applications or processing logic. This prevents them from interfering with each other.
Checkpointing for Reliability
Checkpointing is the mechanism by which a consumer application records its progress in reading events from a partition. This is crucial for fault tolerance. If the consumer restarts, it can resume reading from the last recorded checkpoint, avoiding reprocessing or data loss.
Both the C# EventProcessorClient and the Python EventHubConsumerClient (when configured with appropriate storage or other state management) handle checkpointing automatically.
Advanced Scenarios
For more complex scenarios, consider:
- Batching: Processing events in batches can improve throughput.
- Filtering: Implementing logic to filter specific events based on their content or properties.
- Integration with Other Services: Connecting Event Hubs consumers to services like Azure Functions, Azure Stream Analytics, or Azure Databricks for further processing and analysis.