Receiving Data with Azure Event Hubs

This guide covers the essential patterns and SDKs for consuming events from Azure Event Hubs. Whether you're building real-time analytics, streaming applications, or data pipelines, understanding how to efficiently receive data is crucial.

Key Concepts: Event Hubs uses consumer groups to allow multiple applications to read from the same event stream independently. Each consumer group maintains its own offset within a partition.

Consumer Groups

Consumer groups enable multiple applications or services to process events from an Event Hub independently. Each consumer group tracks its own position (offset) in the event stream for each partition.

Creating a Consumer Group

You can create consumer groups via the Azure portal, Azure CLI, or SDKs. A default consumer group ($Default) is created automatically with each Event Hub.

# Example using Azure CLI to create a consumer group
az eventhubs group create --namespace-name my-eventhub-namespace \
                         --event-hub-name my-event-hub \
                         --name my-consumer-group

Event Hubs SDKs for Receiving

Azure provides SDKs for various programming languages to facilitate receiving events. The most common and recommended approach is using the Event Hubs Kafka or the dedicated Azure Event Hubs libraries.

1. Using the Azure Event Hubs SDK (.NET, Java, Python, Node.js)

The official Azure Event Hubs libraries offer robust features for building reliable event receivers.

Example: Python

This example demonstrates receiving events using the Azure SDK for Python.

from azure.eventhub import EventHubClient, EventPosition, Receiver

        eventhub_connection_str = "YOUR_EVENTHUB_CONNECTION_STRING"
        consumer_group = "$Default"
        event_hub_name = "your_eventhub_name"

        client = EventHubClient.from_connection_string(eventhub_connection_str, consumer_group=consumer_group)

        # Get a receiver for a specific partition (e.g., partition 0)
        # You can iterate through all partitions or specify which ones to listen to
        receiver = client.create_receiver(EventPosition("0"))

        print("Starting to receive events...")

        try:
            while True:
                events = receiver.receive(timeout=100) # Receive up to 100 events, wait up to 100s
                if not events:
                    print("No events received in the last 100 seconds.")
                    continue

                for event in events:
                    print(f"Received event: Sequence Number {event.sequence_number}, Offset {event.offset}")
                    # Process the event data
                    print(f"  Body: {event.body_as_json()}") # or event.body_as_str()

                # Important: After processing, update the offset for checkpointing
                # This is crucial for durable consumption.
                # The exact mechanism depends on your storage solution (e.g., Azure Blob Storage, Cosmos DB)

        except KeyboardInterrupt:
            print("Receiver stopped by user.")
        finally:
            receiver.close()
            client.close()
            print("Connection closed.")
        

Note: For production scenarios, implement robust checkpointing to ensure that you don't lose events if your receiver restarts. Libraries like the Azure Event Hubs Checkpoint Store Blob can help with this.

2. Using the Kafka Protocol

Event Hubs is compatible with the Apache Kafka protocol. This means you can use existing Kafka client libraries to consume data.

Prerequisites for Kafka Compatibility

Example: Java (using Kafka Clients)

This example shows how to consume events using the standard Kafka consumer API.

import org.apache.kafka.clients.consumer.ConsumerConfig;
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.kafka.clients.consumer.KafkaConsumer;
        import org.apache.kafka.common.serialization.StringDeserializer;

        import java.time.Duration;
        import java.util.Collections;
        import java.util.Properties;

        public class EventHubsKafkaConsumer {

            public static void main(String[] args) {
                Properties props = new Properties();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_EVENTHUBS_NAMESPACE.servicebus.windows.net:9093");
                props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group_id"); // Your consumer group name
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"
                props.put("security.protocol", "SASL_SSL");
                props.put("sasl.mechanism", "PLAIN");
                props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$accesskey\" password=\"YOUR_EVENTHUBS_SHARED_ACCESS_KEY\";");

                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Collections.singletonList("your_eventhub_name")); // Your Event Hub name

                System.out.println("Starting Kafka consumer...");

                try {
                    while (true) {
                        org.apache.kafka.clients.consumer.ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.printf("Received record: topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                                    record.topic(), record.partition(), record.offset(), record.key(), record.value());
                            // Process the event data
                        }
                        // Commit offsets periodically for durable consumption
                        // consumer.commitSync();
                    }
                } catch (Exception e) {
                    System.err.println("Error occurred: " + e.getMessage());
                    e.printStackTrace();
                } finally {
                    consumer.close();
                    System.out.println("Kafka consumer closed.");
                }
            }
        }
        

Choosing the Right SDK

Best Practices for Receiving Data

By following these guidelines, you can build efficient and resilient applications that consume data from Azure Event Hubs.

← Previous: Sending Data Next: Understanding Partitions →