Azure Event Hubs

Developer Documentation

Getting Started: Receiving Events

This guide will walk you through the fundamental steps of receiving events from an Azure Event Hub. We'll cover setting up a consumer, processing incoming messages, and handling potential errors.

Prerequisites

1. Install the Azure Event Hubs SDK

Before you can write code, you need to install the necessary SDK package. Below are examples for Python and Node.js.

Python

pip install azure-eventhub

Node.js

npm install @azure/event-hubs

2. Create a Consumer Client

You'll need to instantiate a consumer client that connects to your Event Hub. This typically involves providing your connection string and the Event Hub name.

Python Example

Python
from azure.eventhub import EventHubConsumerClient

        connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
        eventhub_name = "YOUR_EVENTHUB_NAME"

        # For consumer groups, you can specify the consumer_group parameter.
        # If not specified, it defaults to "$Default".
        consumer_client = EventHubConsumerClient.from_connection_string(
            connection_str,
            eventhub_name=eventhub_name,
            consumer_group="$Default" # Or your specific consumer group
        )

        print("Consumer client created successfully.")

Node.js Example

JavaScript
const { EventHubConsumerClient } = require("@azure/event-hubs");

        const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
        const eventHubName = "YOUR_EVENTHUB_NAME";
        const consumerGroup = "$Default"; // Or your specific consumer group

        const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);

        console.log("Consumer client created successfully.");
Important: Replace YOUR_EVENTHUBS_CONNECTION_STRING and YOUR_EVENTHUB_NAME with your actual credentials. Always manage connection strings securely, for example, using Azure Key Vault or environment variables.

3. Process Incoming Events

Once the client is set up, you can start receiving events. The SDK provides mechanisms to subscribe to events and handle them asynchronously.

Python Example (using a callback function)

Python
import asyncio
        from azure.eventhub import EventHubConsumerClient

        connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
        eventhub_name = "YOUR_EVENTHUB_NAME"

        async def process_event(partition_context, event):
            # Process the received event
            print(f"Received event: {event.body}")
            print(f"  Partition ID: {partition_context.partition_id}")
            print(f"  Sequence Number: {event.sequence_number}")

        async def main():
            consumer_client = EventHubConsumerClient.from_connection_string(
                connection_str,
                eventhub_name=eventhub_name,
                consumer_group="$Default"
            )

            print("Starting to receive events...")
            async with consumer_client:
                await consumer_client.receive(on_event=process_event)

        if __name__ == "__main__":
            try:
                asyncio.run(main())
            except KeyboardInterrupt:
                print("Stopped receiving events.")

Node.js Example (using async iterator)

JavaScript
const { EventHubConsumerClient, latestReconnectionPolicy } = require("@azure/event-hubs");

        const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
        const eventHubName = "YOUR_EVENTHUB_NAME";
        const consumerGroup = "$Default";

        async function main() {
            const consumerClient = new EventHubConsumerClient(
                consumerGroup,
                connectionString,
                eventHubName,
                {
                    // Recommended: Add retry logic for robustness
                    reconnectionPolicy: latestReconnectionPolicy
                }
            );

            console.log("Starting to receive events...");

            // Get a reference to the consumer for a specific partition, or loop through all
            const partitionId = "0"; // Or get all partitions
            const subscription = consumerClient.subscribe(
                {
                    processEvents: async (events, context) => {
                        if (events.length === 0) {
                            console.log("No events received.");
                            return;
                        }
                        for (const event of events) {
                            console.log(`Received event: ${Buffer.from(event.body).toString()}`);
                            console.log(`  Partition ID: ${context.partitionId}`);
                            console.log(`  Sequence Number: ${event.sequenceNumber}`);
                        }
                    },
                    processError: async (err, context) => {
                        console.error(`Error occurred in partition ${context.partitionId}: ${err.message}`);
                    }
                },
                { partitionId: partitionId } // To receive from all partitions, omit this option
            );

            // To keep the script running indefinitely:
            await new Promise((resolve) => {
                process.on("SIGINT", resolve); // Handle Ctrl+C
            });

            console.log("Stopping event reception.");
            await subscription.close();
            await consumerClient.close();
        }

        main().catch((err) => {
            console.error("Error:", err);
        });

4. Error Handling and Checkpointing

Robust applications should include proper error handling and checkpointing.

Tip: For production scenarios, explore the azure-eventhub-checkpointstoreblob package (Python) or similar integrations for other languages to implement reliable checkpointing with Azure Blob Storage.

Next Steps

Congratulations! You've learned the basics of receiving events from Azure Event Hubs. Consider exploring the following: