Developer Documentation
This guide will walk you through the fundamental steps of receiving events from an Azure Event Hub. We'll cover setting up a consumer, processing incoming messages, and handling potential errors.
Before you can write code, you need to install the necessary SDK package. Below are examples for Python and Node.js.
pip install azure-eventhub
npm install @azure/event-hubs
You'll need to instantiate a consumer client that connects to your Event Hub. This typically involves providing your connection string and the Event Hub name.
from azure.eventhub import EventHubConsumerClient
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
# For consumer groups, you can specify the consumer_group parameter.
# If not specified, it defaults to "$Default".
consumer_client = EventHubConsumerClient.from_connection_string(
connection_str,
eventhub_name=eventhub_name,
consumer_group="$Default" # Or your specific consumer group
)
print("Consumer client created successfully.")
const { EventHubConsumerClient } = require("@azure/event-hubs");
const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const consumerGroup = "$Default"; // Or your specific consumer group
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
console.log("Consumer client created successfully.");
YOUR_EVENTHUBS_CONNECTION_STRING and YOUR_EVENTHUB_NAME with your actual credentials. Always manage connection strings securely, for example, using Azure Key Vault or environment variables.
Once the client is set up, you can start receiving events. The SDK provides mechanisms to subscribe to events and handle them asynchronously.
import asyncio
from azure.eventhub import EventHubConsumerClient
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
async def process_event(partition_context, event):
# Process the received event
print(f"Received event: {event.body}")
print(f" Partition ID: {partition_context.partition_id}")
print(f" Sequence Number: {event.sequence_number}")
async def main():
consumer_client = EventHubConsumerClient.from_connection_string(
connection_str,
eventhub_name=eventhub_name,
consumer_group="$Default"
)
print("Starting to receive events...")
async with consumer_client:
await consumer_client.receive(on_event=process_event)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Stopped receiving events.")
const { EventHubConsumerClient, latestReconnectionPolicy } = require("@azure/event-hubs");
const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const consumerGroup = "$Default";
async function main() {
const consumerClient = new EventHubConsumerClient(
consumerGroup,
connectionString,
eventHubName,
{
// Recommended: Add retry logic for robustness
reconnectionPolicy: latestReconnectionPolicy
}
);
console.log("Starting to receive events...");
// Get a reference to the consumer for a specific partition, or loop through all
const partitionId = "0"; // Or get all partitions
const subscription = consumerClient.subscribe(
{
processEvents: async (events, context) => {
if (events.length === 0) {
console.log("No events received.");
return;
}
for (const event of events) {
console.log(`Received event: ${Buffer.from(event.body).toString()}`);
console.log(` Partition ID: ${context.partitionId}`);
console.log(` Sequence Number: ${event.sequenceNumber}`);
}
},
processError: async (err, context) => {
console.error(`Error occurred in partition ${context.partitionId}: ${err.message}`);
}
},
{ partitionId: partitionId } // To receive from all partitions, omit this option
);
// To keep the script running indefinitely:
await new Promise((resolve) => {
process.on("SIGINT", resolve); // Handle Ctrl+C
});
console.log("Stopping event reception.");
await subscription.close();
await consumerClient.close();
}
main().catch((err) => {
console.error("Error:", err);
});
Robust applications should include proper error handling and checkpointing.
azure-eventhub-checkpointstoreblob package (Python) or similar integrations for other languages to implement reliable checkpointing with Azure Blob Storage.
Congratulations! You've learned the basics of receiving events from Azure Event Hubs. Consider exploring the following: