This document provides a reference for the various consumer client libraries available for interacting with Azure Event Hubs. These clients enable your applications to read events from Event Hubs partitions.
Azure Event Hubs offers SDKs for multiple programming languages, allowing you to choose the best fit for your application architecture. Key features of these clients include:
The .NET client library for Event Hubs is a powerful and feature-rich option for .NET applications. It leverages modern .NET asynchronous patterns.
EventHubConsumerClient: The primary class for creating consumer instances.PartitionClient: Represents a connection to a single partition.ReadEventsOptions: Options for configuring event reading.Install the NuGet package:
dotnet add package Azure.Messaging.EventHubs.Consumer
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Threading.Tasks;
// Replace with your Event Hubs connection string and consumer group
string connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENT_HUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or your specific consumer group
await using var client = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine($"Starting to read events for consumer group: {consumerGroup}");
// Start processing events
await foreach (PartitionEventBatch eventBatch in client.ReadEventsAsync())
{
Console.WriteLine($"Received a batch with {eventBatch.Count} events.");
foreach (EventData eventData in eventBatch)
{
Console.WriteLine($"\tSequence Number: {eventData.SequenceNumber}");
Console.WriteLine($"\tOffset: {eventData.Offset}");
Console.WriteLine($"\tPartition Key: {eventData.PartitionKey}");
Console.WriteLine($"\tBody: {System.Text.Encoding.UTF8.GetString(eventData.EventBody.ToArray())}");
}
}
The Java Event Hubs client library provides a robust API for Java developers to integrate with Event Hubs.
EventHubConsumerClient: The main class for consuming events.EventProcessorClient: For more advanced scenarios involving checkpointing and load balancing.EventData: Represents an individual event.<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>5.x.x</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.x.x</version>
</dependency>
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventBatch;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.util.function.Consumer;
// Replace with your Event Hubs connection string and consumer group
String connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
String eventHubName = "YOUR_EVENT_HUB_NAME";
String consumerGroup = "$Default"; // Or your specific consumer group
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup(consumerGroup)
.buildConsumerClient();
Consumer<EventBatch> processEventBatch = eventBatch -> {
for (EventData eventData : eventBatch) {
System.out.printf("Received event: Sequence Number %d, Offset %d, Body: %s%n",
eventData.getSequenceNumber(), eventData.getOffset(),
new String(eventData.getBody().toBytes()));
}
};
System.out.printf("Starting to read events for consumer group: %s%n", consumerGroup);
// Start processing events in a loop
while (true) {
try {
consumerClient.readEvents().forEach(processEventBatch);
} catch (Exception e) {
System.err.println("Error reading events: " + e.getMessage());
// Implement retry logic or exception handling
try {
Thread.sleep(5000); // Wait before retrying
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
The Python client library for Event Hubs allows seamless integration with Python applications.
EventHubConsumerClient: The main class for creating consumer instances.receive_batch: Method to receive a batch of events.pip install azure-eventhubs[eventprocessor]
import asyncio
from azure.eventhubs.aio import EventHubConsumerClient
from azure.eventhubs.models import EventPosition
# Replace with your Event Hubs connection string and consumer group
CONNECTION_STR = "YOUR_EVENT_HUBS_CONNECTION_STRING"
EVENTHUB_NAME = "YOUR_EVENT_HUB_NAME"
CONSUMER_GROUP = "$Default" # Or your specific consumer group
async def main():
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
event_hub_name=EVENTHUB_NAME
)
async def on_event_batch(partition_context, events):
print(f"Received a batch of {len(events)} events from partition {partition_context.partition_id}")
for event in events:
print(f"\tSequence Number: {event.sequence_number}")
print(f"\tOffset: {event.offset}")
print(f"\tBody: {event.body}")
async with client:
print(f"Starting to read events for consumer group: {CONSUMER_GROUP}")
await client.receive_batch(
on_event_batch,
# Use EventPosition.earliest() to start from the beginning of the partition
# Use EventPosition.latest() to start from the end of the partition
# Or specify a sequence number or offset
event_position=EventPosition.latest()
)
if __name__ == "__main__":
asyncio.run(main())
The JavaScript/TypeScript client library is suitable for Node.js applications and web applications.
EventHubConsumerClient: The main class for creating consumer instances.subscribe: Method to subscribe to events.npm install @azure/event-hubs
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
// Replace with your Event Hubs connection string and consumer group
const connectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENT_HUB_NAME";
const consumerGroup = "$Default"; // Or your specific consumer group
async function main() {
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
console.log(`Starting to read events for consumer group: ${consumerGroup}`);
const subscription = consumerClient.subscribe({
async processEvents(events, context) {
console.log(`Received a batch of ${events.length} events from partition ${context.partitionId}`);
for (const event of events) {
console.log(`\tSequence Number: ${event.sequenceNumber}`);
console.log(`\tOffset: ${event.offset}`);
console.log(`\tBody: ${Buffer.from(event.body).toString()}`);
}
},
async processError(err, context) {
console.error(`Error occurred: ${err.message}`);
console.error(`Partition ID: ${context.partitionId}`);
}
}, {
// Use earliestEventPosition to start from the beginning
// Use latestEventPosition to start from the end
// Or specify a sequence number or offset
startPosition: earliestEventPosition
});
// To stop the subscription after some time or based on a condition:
// await subscription.close();
}
main().catch((err) => {
console.error("The following error occurred during the main function: ", err);
});
The choice of client library typically depends on your application's technology stack and specific requirements. Consider factors like: