Sending and Receiving Messages with Azure Event Hubs
This guide walks you through the fundamental process of sending events to and receiving events from Azure Event Hubs using common SDKs.
Prerequisites
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created.
- Connection string for your Event Hub.
Sending Messages
Sending messages involves creating an event producer and sending events to a specified partition or allowing Event Hubs to choose a partition. Here's an example using the .NET SDK.
C# Example (ProducerClient)
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
// Replace with your Event Hub connection string and Hub Name
string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
try
{
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
for (int i = 1; i <= 5; i++)
{
var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i}"));
eventData.Properties.Add("EventType", "Demo");
if (!eventBatch.TryAdd(eventData))
{
throw new Exception($"Event {i} is too large for the batch and cannot be added.");
}
}
await producerClient.SendAsync(eventBatch);
Console.WriteLine("Sent a batch of events.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending events: {ex.Message}");
}
Receiving Messages
Receiving messages typically involves creating an event consumer and subscribing to events from a consumer group. The Event Processor library simplifies this by managing checkpoints and load balancing.
C# Example (EventProcessorClient)
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
// Replace with your Event Hub connection string, Hub Name, and Consumer Group Name
string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or specify your custom group
var clientOptions = new EventProcessorClientOptions
{
// Configure options as needed, e.g., maximum batch size, starting position
};
var processor = new EventProcessorClient(
new Azure.Identity.DefaultAzureCredential(), // Or provide connection string directly
consumerGroup,
connectionString,
eventHubName,
clientOptions);
processor.ProcessEventAsync += (args) =>
{
Console.WriteLine($"Received event: {Encoding.UTF8.GetString(args.Data.EventBody.ToArray())}");
Console.WriteLine($"Partition: {args.Partition.PartitionId}");
if (args.Data.Properties.TryGetValue("EventType", out object eventType))
{
Console.WriteLine($"Event Type: {eventType}");
}
return Task.CompletedTask;
};
processor.ProcessErrorAsync += (args) =>
{
Console.WriteLine($"Error in processor: {args.Exception.Message}");
return Task.CompletedTask;
};
Console.WriteLine("Starting event processor...");
await processor.StartProcessingAsync();
// Keep the application running to process events.
// In a real application, you'd have a more robust way to manage this.
Console.WriteLine("Press Ctrl+C to stop processing.");
await Task.Delay(Timeout.Infinite);
Note: For production scenarios, consider using Azure Identity for authentication instead of directly embedding connection strings.
Python Example (EventHubProducerClient & EventHubConsumerClient)
import asyncio
from azure.eventhub import EventData, EventHubProducerClient, EventHubConsumerClient
# Replace with your Event Hub connection string and Hub Name
CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"
async def send_events():
producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, EVENT_HUB_NAME)
async with producer:
async with producer.create_batch() as batch:
for i in range(5):
message = f"Event {i} from Python"
event_data = EventData(message.encode('utf-8'))
event_data.properties["Source"] = "PythonSender"
try:
batch.add(event_data)
except ValueError:
print(f"Batch full. Sending {len(batch)} events.")
await producer.send_batch(batch)
batch = producer.create_batch()
batch.add(event_data)
await producer.send_batch(batch)
print("Sent batch of events.")
async def receive_events():
consumer = EventHubConsumerClient.from_connection_string(CONNECTION_STR, EVENT_HUB_NAME)
async with consumer:
async for event_data in consumer.receive_batch(max_events=10, starting_position="-1"):
print(f"Received event: {event_data.body_as_str()}")
print(f"Partition: {event_data.partition_id}")
if "Source" in event_data.properties:
print(f"Source: {event_data.properties['Source']}")
async def main():
print("Sending events...")
await send_events()
print("\nReceiving events...")
await receive_events()
if __name__ == "__main__":
asyncio.run(main())
Best Practices
- Batching: Send events in batches to improve throughput and reduce costs.
- Partitioning: For ordered processing, send related events to the same partition using a partition key.
- Error Handling: Implement robust error handling and retry mechanisms.
- Consumer Groups: Use distinct consumer groups for different applications or processing logic.
- Checkpointing: Ensure your consumers periodically checkpoint their progress to avoid reprocessing events after restarts.
Warning: Receiving events without proper checkpointing can lead to data loss or duplicate processing if the consumer application restarts.