Receiving Events
This guide explains how to receive events from Azure Event Hubs using various SDKs and tools. Understanding how to consume events is crucial for building event-driven applications.
Core Concepts
Receiving events typically involves connecting to an Event Hub using a Consumer Group. Consumer groups allow multiple applications to independently consume events from the same Event Hub without interfering with each other.
Consumer Groups Explained
Each Event Hub has a default consumer group named $Default. You can create additional consumer groups to cater to different application needs. For example, one consumer group might be used for real-time processing, while another is used for batch analytics.
Receiving Events with .NET SDK
The Azure SDK for .NET provides robust classes for receiving events. The primary class for this is EventProcessorClient.
Example: Basic Event Receiving Loop
// Install the package:
// dotnet add package Azure.Messaging.EventHubs.Processor
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventReceiver
{
private const string eventHubConnectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
private const string eventHubName = "YOUR_EVENT_HUB_NAME";
private const string consumerGroup = "$Default"; // Or your custom consumer group
private const string blobStorageConnectionString = "YOUR_BLOB_STORAGE_CONNECTION_STRING";
private const string blobContainerName = "eventprocessor-checkpoints";
public static async Task Main(string[] args)
{
var storageClient = new BlobServiceClient(blobStorageConnectionString);
var blobContainerClient = storageClient.GetBlobContainerClient(blobContainerName);
var processorOptions = new EventProcessorClientOptions
{
// Configure options as needed, e.g., RetryOptions, LoadBalancingOptions
};
var processor = new EventProcessorClient(
blobContainerClient,
consumerGroup,
eventHubConnectionString,
eventHubName,
processorOptions);
// Register handlers for processing events and errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
Console.WriteLine("Starting Event Processor...");
await processor.StartProcessingAsync();
Console.WriteLine("Press Enter to stop the processor...");
Console.ReadLine();
Console.WriteLine("Stopping Event Processor...");
await processor.StopProcessingAsync();
Console.WriteLine("Event Processor stopped.");
}
static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
{
// Access the event data
Console.WriteLine($"\tReceived event: PartitionId={eventArgs.Partition.PartitionId}, Offset={eventArgs.Data.Offset}, SequenceNumber={eventArgs.Data.SequenceNumber}");
string messageBody = Encoding.UTF8.GetString(eventArgs.Data.EventBody.ToArray());
Console.WriteLine($"\tMessage body: {messageBody}");
// You can also access event properties like EnqueuedTime, Properties, etc.
// Console.WriteLine($"\tEnqueued Time: {eventArgs.Data.EnqueuedTime}");
// Complete the event to mark it as processed.
// If not completed, it will be re-delivered.
await eventArgs.CompleteAsync();
}
static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
{
Console.WriteLine($"\tError in processor: {eventArgs.FullyQualifiedNamespace}, {eventArgs.EntityPath}, {eventArgs.PartitionId}, {eventArgs.Reason}");
// Handle specific error types if needed
return Task.CompletedTask;
}
}
Receiving Events with Python SDK
The Python SDK offers similar capabilities for event consumption.
Example: Asynchronous Event Consumption
# pip install azure-eventhubsprotocol
# pip install azure-eventhubsprotocol[aiohttp]
import asyncio
from azure.eventhubsprotocol.aio import EventHubConsumerClient
EVENTHUB_CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
EVENTHUB_NAME = "YOUR_EVENT_HUB_NAME"
CONSUMER_GROUP = "$Default" # Or your custom consumer group
async def process_event(event):
print(f"Received event: {event}")
# Access event data: event.body, event.properties, event.offset, event.sequence_number
async def main():
consumer_client = EventHubConsumerClient.from_connection_string(
EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
event_hub_name=EVENTHUB_NAME
)
print("Starting Event Hub consumer...")
async with consumer_client:
await consumer_client.subscribe(
process_event,
partition_id="0" # Specify partition IDs to receive from, or omit for all
)
# To receive from all partitions, you'd typically manage partition discovery
# or subscribe to each partition explicitly.
# For simplicity, this example shows a single partition.
print("Listening for events. Press Ctrl+C to stop.")
await asyncio.Future() # Run forever until interrupted
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Consumer stopped.")
Key Considerations
- Checkpointing: For reliable event processing, especially in distributed scenarios, it's essential to implement checkpointing. Checkpointing allows your consumer to resume processing from the last successfully processed event, preventing data loss or reprocessing. The .NET
EventProcessorClienthandles this automatically with Blob storage. - Error Handling: Implement robust error handling to manage transient network issues, malformed events, or processing logic failures.
- Scaling: Event Hubs is designed for high throughput. Ensure your receiving applications can scale horizontally to keep up with the event ingestion rate. Consumer groups help achieve this by allowing multiple instances of your application to process events in parallel.
- Partition Management: Understand how Event Hubs partitions work. Events within a partition are ordered, but there's no global ordering across partitions. Distributing load across partitions is key to scaling.
Further Reading
Azure Event Hubs Consumer Groups
Learn more about the role and management of consumer groups.
Event Processor Host (.NET)
Deep dive into the .NET Event Processor Host for reliable event processing.