Azure Event Hubs Documentation

Sending and Receiving Messages with Azure Event Hubs

This guide walks you through the fundamental process of sending events to and receiving events from Azure Event Hubs using common SDKs.

Prerequisites

Sending Messages

Sending messages involves creating an event producer and sending events to a specified partition or allowing Event Hubs to choose a partition. Here's an example using the .NET SDK.

C# Example (ProducerClient)


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

// Replace with your Event Hub connection string and Hub Name
string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";

await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);

try
{
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

    for (int i = 1; i <= 5; i++)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i}"));
        eventData.Properties.Add("EventType", "Demo");

        if (!eventBatch.TryAdd(eventData))
        {
            throw new Exception($"Event {i} is too large for the batch and cannot be added.");
        }
    }

    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("Sent a batch of events.");
}
catch (Exception ex)
{
    Console.WriteLine($"Error sending events: {ex.Message}");
}
            

Receiving Messages

Receiving messages typically involves creating an event consumer and subscribing to events from a consumer group. The Event Processor library simplifies this by managing checkpoints and load balancing.

C# Example (EventProcessorClient)


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

// Replace with your Event Hub connection string, Hub Name, and Consumer Group Name
string connectionString = "YOUR_EVENT_HUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or specify your custom group

var clientOptions = new EventProcessorClientOptions
{
    // Configure options as needed, e.g., maximum batch size, starting position
};

var processor = new EventProcessorClient(
    new Azure.Identity.DefaultAzureCredential(), // Or provide connection string directly
    consumerGroup,
    connectionString,
    eventHubName,
    clientOptions);

processor.ProcessEventAsync += (args) =>
{
    Console.WriteLine($"Received event: {Encoding.UTF8.GetString(args.Data.EventBody.ToArray())}");
    Console.WriteLine($"Partition: {args.Partition.PartitionId}");
    if (args.Data.Properties.TryGetValue("EventType", out object eventType))
    {
        Console.WriteLine($"Event Type: {eventType}");
    }
    return Task.CompletedTask;
};

processor.ProcessErrorAsync += (args) =>
{
    Console.WriteLine($"Error in processor: {args.Exception.Message}");
    return Task.CompletedTask;
};

Console.WriteLine("Starting event processor...");
await processor.StartProcessingAsync();

// Keep the application running to process events.
// In a real application, you'd have a more robust way to manage this.
Console.WriteLine("Press Ctrl+C to stop processing.");
await Task.Delay(Timeout.Infinite);
            
Note: For production scenarios, consider using Azure Identity for authentication instead of directly embedding connection strings.

Python Example (EventHubProducerClient & EventHubConsumerClient)


import asyncio
from azure.eventhub import EventData, EventHubProducerClient, EventHubConsumerClient

# Replace with your Event Hub connection string and Hub Name
CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"

async def send_events():
    producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, EVENT_HUB_NAME)
    async with producer:
        async with producer.create_batch() as batch:
            for i in range(5):
                message = f"Event {i} from Python"
                event_data = EventData(message.encode('utf-8'))
                event_data.properties["Source"] = "PythonSender"
                try:
                    batch.add(event_data)
                except ValueError:
                    print(f"Batch full. Sending {len(batch)} events.")
                    await producer.send_batch(batch)
                    batch = producer.create_batch()
                    batch.add(event_data)
            await producer.send_batch(batch)
            print("Sent batch of events.")

async def receive_events():
    consumer = EventHubConsumerClient.from_connection_string(CONNECTION_STR, EVENT_HUB_NAME)
    async with consumer:
        async for event_data in consumer.receive_batch(max_events=10, starting_position="-1"):
            print(f"Received event: {event_data.body_as_str()}")
            print(f"Partition: {event_data.partition_id}")
            if "Source" in event_data.properties:
                print(f"Source: {event_data.properties['Source']}")

async def main():
    print("Sending events...")
    await send_events()
    print("\nReceiving events...")
    await receive_events()

if __name__ == "__main__":
    asyncio.run(main())
            

Best Practices

Warning: Receiving events without proper checkpointing can lead to data loss or duplicate processing if the consumer application restarts.