Learn how to send messages to and receive messages from an Azure Event Hub using various client libraries.

Prerequisites

Sending Messages

Event Hubs supports sending messages using different client SDKs. Below are examples for common languages.


from azure.eventhub import EventHubProducerClient, EventData

connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"

producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

def send_event_data_batch(producer):
    # Prepare data
    data = [
        {"sensorId": "1", "temperature": 23.5, "humidity": 60.1},
        {"sensorId": "2", "temperature": 24.1, "humidity": 62.5},
    ]

    events = [EventData(str(item)) for item in data]

    with producer:
        # Send batches
        batch = producer.create_batch()
        for event in events:
            try:
                batch.add(event)
            except Exception as e:
                # Batch is full, send the current batch and start a new one
                producer.send_batch(batch)
                batch = producer.create_batch()
                batch.add(event)
        # Send the remaining events in the last batch
        producer.send_batch(batch)
    print("Messages sent successfully.")

send_event_data_batch(producer)
                        

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

public class EventSender
{
    public static async Task SendMessagesAsync(string connectionString, string eventHubName)
    {
        await using var producer = new EventHubProducerClient(connectionString, eventHubName);

        using EventDataBatch eventBatch = await producer.TryCreateBatchAsync();

        var messages = new List<string>
        {
            "{\"sensorId\": \"1\", \"temperature\": 23.5, \"humidity\": 60.1}",
            "{\"sensorId\": \"2\", \"temperature\": 24.1, \"humidity\": 62.5}",
            "{\"sensorId\": \"3\", \"temperature\": 22.9, \"humidity\": 59.8}"
        };

        foreach (var message in messages)
        {
            if (!eventBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
            {
                // Batch is full, send the current batch and start a new one
                await producer.SendAsync(eventBatch);
                eventBatch = await producer.TryCreateBatchAsync();
                if (!eventBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
                {
                    Console.WriteLine($"Message too large to add to the batch: {message}");
                }
            }
        }

        // Send the remaining events in the last batch
        await producer.SendAsync(eventBatch);
        Console.WriteLine("Messages sent successfully.");
    }
}
                        

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;

import java.util.Arrays;
import java.util.List;

public class EventSender {

    public static void sendMessages(String connectionString, String eventHubName) {
        EventHubProducerAsyncClient producer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .buildAsyncProducerClient();

        List<String> messages = Arrays.asList(
            "{\"sensorId\": \"1\", \"temperature\": 23.5, \"humidity\": 60.1}",
            "{\"sensorId\": \"2\", \"temperature\": 24.1, \"humidity\": 62.5}",
            "{\"sensorId\": \"3\", \"temperature\": 22.9, \"humidity\": 59.8}"
        );

        producer.createBatch(new CreateBatchOptions())
            .flatMap(batch -> {
                for (String message : messages) {
                    try {
                        batch.add(new EventData(message));
                    } catch (IllegalArgumentException e) {
                        System.err.println("Message too large to add: " + message);
                    }
                }
                return producer.send(batch);
            })
            .subscribe(
                unused -> System.out.println("Messages sent successfully."),
                error -> System.err.println("Error sending messages: " + error.getMessage())
            );

        // In a real application, you'd want to manage the client lifecycle properly
        // and wait for operations to complete.
        try {
            Thread.sleep(5000); // Give time for async operations
        } catch (InterruptedException ignored) {}
    }
}
                        

const { EventHubProducerClient, EventData } = require("@azure/event-hubs");

const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";

const producer = new EventHubProducerClient(connectionString, eventHubName);

async function sendEvents() {
    const batch = await producer.createBatch();

    const messages = [
        '{"sensorId": "1", "temperature": 23.5, "humidity": 60.1}',
        '{"sensorId": "2", "temperature": 24.1, "humidity": 62.5}',
        '{"sensorId": "3", "temperature": 22.9, "humidity": 59.8}',
    ];

    for (const message of messages) {
        const eventData = new EventData(message);
        if (!batch.tryAdd(eventData)) {
            // Batch is full, send the current batch and start a new one
            await producer.sendBatch(batch);
            // Create a new batch and add the current event
            batch = await producer.createBatch();
            if (!batch.tryAdd(eventData)) {
                console.log(`Message too large to add to the batch: ${message}`);
            }
        }
    }

    // Send the remaining events in the last batch
    await producer.sendBatch(batch);
    console.log("Messages sent successfully.");
}

sendEvents().catch(err => console.error("Error sending events:", err));
                        

Receiving Messages

Consumers can read messages from Event Hubs using consumer groups. Each consumer group provides a separate view of the event stream.


from azure.eventhub import EventHubConsumerClient

connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
consumer_group = "$Default"  # Or your custom consumer group
eventhub_name = "YOUR_EVENTHUB_NAME"

consumer = EventHubConsumerClient.from_connection_string(
    connection_str,
    consumer_group=consumer_group,
    eventhub_name=eventhub_name
)

def process_event(event):
    print(f"Received event: {event.body_as_json()}")

with consumer:
    consumer.receive(on_event=process_event, starting_position="-1") # Start from the beginning
# In a real application, you would want more sophisticated error handling and control flow.
# This example will run indefinitely until interrupted.
print("Consumer started. Press Ctrl+C to stop.")
                        

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Threading.Tasks;
using System.Text;

public class EventReceiver
{
    public static async Task ReceiveMessagesAsync(string connectionString, string eventHubName, string consumerGroup)
    {
        await using var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

        Console.WriteLine("Listening for messages...");

        await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync())
        {
            Console.WriteLine($"Received message: {Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray())}");
            // Process the event
        }
    }
}
                        

import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventPosition;

import java.util.function.Consumer;

public class EventReceiver {

    public static void receiveMessages(String connectionString, String eventHubName, String consumerGroup) {
        EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
            .connectionString(connectionString, eventHubName)
            .consumerGroup(consumerGroup)
            .buildAsyncConsumerClient();

        System.out.println("Listening for messages...");

        Consumer<com.azure.messaging.eventhubs.models.PartitionEvent> processEvent = partitionEvent -> {
            System.out.println("Received message: " + new String(partitionEvent.getData().getBody().toBytes()));
            // Process the event
        };

        consumer.getEventFlowable(EventPosition.earliest()).subscribe(processEvent, error -> System.err.println("Error receiving messages: " + error.getMessage()));

        // In a real application, you'd manage the client lifecycle and prevent the application from exiting.
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException ignored) {}
    }
}
                        

const { EventHubConsumerClient, earliest } = require("@azure/event-hubs");

const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const consumerGroup = "$Default"; // Or your custom consumer group

const consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

async function processEvents() {
    console.log("Listening for messages...");

    const subscription = consumer.subscribe({
        async processEvents(events, context) {
            for (const event of events) {
                console.log(`Received message: ${event.body}`);
                // Process the event
            }
        },
        async processError(err, context) {
            console.error(`Error receiving events: ${err}`);
        }
    }, { startPosition: earliest() }); // Start from the beginning

    // The subscription will keep running until explicitly closed or the process exits.
    // To stop, you would call `await subscription.close();`
}

processEvents().catch(err => console.error("Error setting up consumer:", err));
                        

Important Considerations

  • Connection Strings: Always protect your connection strings and avoid hardcoding them in production code. Use environment variables or Azure Key Vault.
  • Consumer Groups: Each consumer group maintains its own offset. Different applications or instances can consume events independently from the same Event Hub.
  • Error Handling: Implement robust error handling for both sending and receiving operations.
  • Batching: Sending messages in batches is more efficient than sending them one by one.

Performance Tip

For high-throughput scenarios, consider using the native AMQP protocol and the Event Hubs SDKs, which are optimized for performance and reliability.