Sending and Receiving Messages with Azure Event Hubs
This guide will walk you through the fundamental process of sending events to and receiving events from Azure Event Hubs using various SDKs.
1. Sending Messages (Producers)
Event producers are applications that send event data to an Event Hub. You can use the Azure SDKs for various languages to achieve this.
Using the Azure SDK for .NET
First, install the necessary NuGet package:
dotnet add package Azure.Messaging.EventHubs
Here's a C# example of sending a single event:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventHubProducerClientExample
{
// Replace with your Event Hub connection string and hub name
private const string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
private const string eventHubName = "YOUR_EVENT_HUB_NAME";
public static async Task Main(string[] args)
{
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
try
{
var eventBody = $"{{ \"messageId\": \"{Guid.NewGuid()}\", \"timestamp\": \"{DateTime.UtcNow}\", \"payload\": \"Hello, Event Hubs!\" }}";
var eventData = new EventData(Encoding.UTF8.GetBytes(eventBody));
await producerClient.SendAsync(eventData);
Console.WriteLine($"Sent event: {eventBody}");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending event: {ex.Message}");
}
}
}
Using the Azure SDK for Python
Install the SDK:
pip install azure-eventhub
Python example for sending events:
import os
from azure.eventhub import EventHubProducerClient
# Replace with your Event Hub connection string and hub name
CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"
async def send_event_data():
# Create a producer client to send events to the Event Hub
producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, EVENT_HUB_NAME)
async with producer:
# Prepare event data
event_data = {
"messageId": os.urandom(16).hex(),
"timestamp": 'now',
"payload": "Hello from Python!"
}
event_body = str(event_data)
# Send a single event
await producer.send_batch([EventData(event_body.encode('utf-8'))])
print(f"Sent event: {event_data}")
if __name__ == '__main__':
import asyncio
asyncio.run(send_event_data())
2. Receiving Messages (Consumers)
Event consumers are applications that read event data from an Event Hub. This typically involves working with consumer groups.
Using the Azure SDK for .NET
Ensure you have the `Azure.Messaging.EventHubs` NuGet package installed.
C# example for receiving events:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
public class EventHubConsumerClientExample
{
// Replace with your Event Hub connection string and hub name
private const string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
private const string eventHubName = "YOUR_EVENT_HUB_NAME";
private const string consumerGroup = "$Default"; // Or your custom consumer group
public static async Task Main(string[] args)
{
await using var consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine("Listening for events...");
try
{
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
Console.WriteLine($"Received event: {Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray())}");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error receiving events: {ex.Message}");
}
}
}
Using the Azure SDK for Python
Use the `azure-eventhub` package.
Python example for receiving events:
import os
from azure.eventhub import EventHubConsumerClient, EventData
# Replace with your Event Hub connection string and hub name
CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"
CONSUMER_GROUP = "$Default" # Or your custom consumer group
async def receive_events():
# Create a consumer client to receive events from the Event Hub
consumer = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
CONSUMER_GROUP,
EVENT_HUB_NAME
)
async with consumer:
print("Listening for events...")
async for event_data in consumer.receive_batch():
event_body = event_data.body_content
print(f"Received event: {event_body}")
if __name__ == '__main__':
import asyncio
asyncio.run(receive_events())
$Default consumer group is created automatically.
3. Advanced Scenarios
- Batching: For improved efficiency, producers can send events in batches. The SDKs provide methods for batch sending.
- Partitioning: Events can be sent to specific partitions using a partition key to ensure ordered processing within that partition.
- Checkpointing: Consumers use checkpointing to store their progress (the offset and sequence number of the last processed event) so they can resume reading from where they left off.
- Error Handling: Implement robust error handling and retry mechanisms in both producers and consumers.
Refer to the official Azure Event Hubs documentation for more in-depth information on these topics and language-specific SDK details.