Azure Event Hubs Documentation

Consumer Clients Reference

This document provides a reference for the various consumer client libraries available for interacting with Azure Event Hubs. These clients enable your applications to read events from Event Hubs partitions.

Overview

Azure Event Hubs offers SDKs for multiple programming languages, allowing you to choose the best fit for your application architecture. Key features of these clients include:

Available Client Libraries

1. Azure SDK for .NET

The .NET client library for Event Hubs is a powerful and feature-rich option for .NET applications. It leverages modern .NET asynchronous patterns.

Key Classes and Interfaces:
Installation:

Install the NuGet package:

dotnet add package Azure.Messaging.EventHubs.Consumer
Usage Example (.NET):
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Threading.Tasks;

// Replace with your Event Hubs connection string and consumer group
string connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or your specific consumer group

await using var client = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

Console.WriteLine($"Starting to read events for consumer group: {consumerGroup}");

// Start processing events
await foreach (PartitionEventBatch eventBatch in client.ReadEventsAsync())
{
    Console.WriteLine($"Received a batch with {eventBatch.Count} events.");
    foreach (EventData eventData in eventBatch)
    {
        Console.WriteLine($"\tSequence Number: {eventData.SequenceNumber}");
        Console.WriteLine($"\tOffset: {eventData.Offset}");
        Console.WriteLine($"\tPartition Key: {eventData.PartitionKey}");
        Console.WriteLine($"\tBody: {System.Text.Encoding.UTF8.GetString(eventData.EventBody.ToArray())}");
    }
}

2. Azure SDK for Java

The Java Event Hubs client library provides a robust API for Java developers to integrate with Event Hubs.

Key Classes and Interfaces:
Installation (Maven):
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>5.x.x</version>
</dependency>
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.x.x</version>
</dependency>
Usage Example (Java):
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventBatch;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.util.function.Consumer;

// Replace with your Event Hubs connection string and consumer group
String connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
String eventHubName = "YOUR_EVENT_HUB_NAME";
String consumerGroup = "$Default"; // Or your specific consumer group

EventHubConsumerClient consumerClient = new EventHubClientBuilder()
    .connectionString(connectionString, eventHubName)
    .consumerGroup(consumerGroup)
    .buildConsumerClient();

Consumer<EventBatch> processEventBatch = eventBatch -> {
    for (EventData eventData : eventBatch) {
        System.out.printf("Received event: Sequence Number %d, Offset %d, Body: %s%n",
            eventData.getSequenceNumber(), eventData.getOffset(),
            new String(eventData.getBody().toBytes()));
    }
};

System.out.printf("Starting to read events for consumer group: %s%n", consumerGroup);

// Start processing events in a loop
while (true) {
    try {
        consumerClient.readEvents().forEach(processEventBatch);
    } catch (Exception e) {
        System.err.println("Error reading events: " + e.getMessage());
        // Implement retry logic or exception handling
        try {
            Thread.sleep(5000); // Wait before retrying
        } catch (InterruptedException interruptedException) {
            Thread.currentThread().interrupt();
        }
    }
}

3. Azure SDK for Python

The Python client library for Event Hubs allows seamless integration with Python applications.

Key Classes and Functions:
Installation:
pip install azure-eventhubs[eventprocessor]
Usage Example (Python):
import asyncio
from azure.eventhubs.aio import EventHubConsumerClient
from azure.eventhubs.models import EventPosition

# Replace with your Event Hubs connection string and consumer group
CONNECTION_STR = "YOUR_EVENT_HUBS_CONNECTION_STRING"
EVENTHUB_NAME = "YOUR_EVENT_HUB_NAME"
CONSUMER_GROUP = "$Default" # Or your specific consumer group

async def main():
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        event_hub_name=EVENTHUB_NAME
    )

    async def on_event_batch(partition_context, events):
        print(f"Received a batch of {len(events)} events from partition {partition_context.partition_id}")
        for event in events:
            print(f"\tSequence Number: {event.sequence_number}")
            print(f"\tOffset: {event.offset}")
            print(f"\tBody: {event.body}")

    async with client:
        print(f"Starting to read events for consumer group: {CONSUMER_GROUP}")
        await client.receive_batch(
            on_event_batch,
            # Use EventPosition.earliest() to start from the beginning of the partition
            # Use EventPosition.latest() to start from the end of the partition
            # Or specify a sequence number or offset
            event_position=EventPosition.latest()
        )

if __name__ == "__main__":
    asyncio.run(main())

4. Azure SDK for JavaScript/TypeScript

The JavaScript/TypeScript client library is suitable for Node.js applications and web applications.

Key Classes and Functions:
Installation:
npm install @azure/event-hubs
Usage Example (JavaScript):
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

// Replace with your Event Hubs connection string and consumer group
const connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENT_HUB_NAME";
const consumerGroup = "$Default"; // Or your specific consumer group

async function main() {
    const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

    console.log(`Starting to read events for consumer group: ${consumerGroup}`);

    const subscription = consumerClient.subscribe({
        async processEvents(events, context) {
            console.log(`Received a batch of ${events.length} events from partition ${context.partitionId}`);
            for (const event of events) {
                console.log(`\tSequence Number: ${event.sequenceNumber}`);
                console.log(`\tOffset: ${event.offset}`);
                console.log(`\tBody: ${Buffer.from(event.body).toString()}`);
            }
        },
        async processError(err, context) {
            console.error(`Error occurred: ${err.message}`);
            console.error(`Partition ID: ${context.partitionId}`);
        }
    }, {
        // Use earliestEventPosition to start from the beginning
        // Use latestEventPosition to start from the end
        // Or specify a sequence number or offset
        startPosition: earliestEventPosition
    });

    // To stop the subscription after some time or based on a condition:
    // await subscription.close();
}

main().catch((err) => {
    console.error("The following error occurred during the main function: ", err);
});

Choosing the Right Consumer Client

The choice of client library typically depends on your application's technology stack and specific requirements. Consider factors like:

Tip: For production scenarios, always consider using a checkpoint store (e.g., Blob Storage, Azure Cosmos DB) to persist consumer offsets. This allows your application to resume processing from where it left off in case of restarts or failures.
Important: Ensure you have the necessary permissions and network connectivity to access your Event Hubs namespace.