Sending and Receiving Events with Azure Event Hubs

This guide provides practical examples and best practices for interacting with Azure Event Hubs, focusing on sending and receiving event data using various SDKs.

Introduction to Event Hubs Interaction

Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. It enables you to process millions of events per second from diverse sources. The core operations involve producing (sending) events to an event hub and consuming (receiving) events from it.

Sending Events (Producing)

Producing events involves sending data records to a specific partition within an event hub. You can choose to send events to a specific partition or let Event Hubs handle the partition assignment automatically.

Using the .NET SDK

The Azure.Messaging.EventHubs NuGet package provides a robust client for interacting with Event Hubs.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Text;
using System.Threading.Tasks;

// Replace with your Event Hubs connection string and hub name
string connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";

await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);

using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

for (int i = 1; i <= 5; i++)
{
    var messageBody = $"Event {i} - {DateTime.UtcNow}";
    if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(messageBody))))
    {
        throw new Exception($"The event {i} is too large for the batch and cannot be sent.");
    }
}

try
{
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine($"A batch of {eventBatch.Count} events has been published.");
}
catch (Exception ex)
{
    Console.WriteLine($"Error sending batch: {ex.Message}");
}
            

Using the Python SDK

The azure-eventhubs library is used for Python interactions.


import asyncio
from azure.eventhub import EventHubProducerClient, EventData

async def send_events():
    connection_string = "YOUR_EVENTHUBS_CONNECTION_STRING"
    eventhub_name = "YOUR_EVENTHUB_NAME"

    producer = EventHubProducerClient.from_connection_string(connection_string, eventhub_name)

    async with producer:
        event_data_batch = await producer.create_batch()
        for i in range(5):
            message = f"Event {i+1} - {datetime.datetime.utcnow().isoformat()}"
            if not event_data_batch.add(EventData(message)):
                print(f"Event {i+1} is too large for the batch.")
                break
        
        try:
            await producer.send_batch(event_data_batch)
            print(f"Sent {len(event_data_batch)} events.")
        except Exception as e:
            print(f"Error sending batch: {e}")

if __name__ == "__main__":
    asyncio.run(send_events())
            

Receiving Events (Consuming)

Receiving events is typically done using an Event Hubs consumer group. Each consumer group gets its own view of the events in the hub, allowing multiple applications to process the same event stream independently.

Using the .NET SDK

The Azure.Messaging.EventHubs.Consumer namespace is used for consuming events.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Collections.Generic;

// Replace with your Event Hubs connection string, hub name, and consumer group
string connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or your custom consumer group

await using var consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

Console.WriteLine("Listening for events...");

var processingTask = Task.Run(async () =>
{
    await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
    {
        try
        {
            string message = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
            Console.WriteLine($"Received event: {message} from partition {partitionEvent.Partition.Id}");
            // Process the event here
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing event: {ex.Message}");
        }
    }
});

await processingTask;
            

Using the Python SDK

The azure-eventhubs library also handles event consumption.


import asyncio
from azure.eventhub import EventHubConsumerClient

async def receive_events():
    connection_string = "YOUR_EVENTHUBS_CONNECTION_STRING"
    eventhub_name = "YOUR_EVENTHUB_NAME"
    consumer_group = "$Default"  # Or your custom consumer group

    consumer = EventHubConsumerClient.from_connection_string(
        connection_string,
        consumer_group,
        eventhub_name
    )

    print("Listening for events...")

    async def on_event(partition_context, event):
        print(f"Received event: {event.body_as_str()} from partition {partition_context.partition_id}")
        # Process the event here

    async def on_error(partition_context, error):
        print(f"Error in partition {partition_context.partition_id}: {error}")

    async with consumer:
        await consumer.receive(on_event, on_error=on_error)

if __name__ == "__main__":
    asyncio.run(receive_events())
            

Important Considerations

Security Note

Never embed connection strings directly in client-side code. Use secure methods like Azure Key Vault or managed identities for accessing Event Hubs credentials.

Further Reading