Receiving Data with Azure Event Hubs
This guide covers the essential patterns and SDKs for consuming events from Azure Event Hubs. Whether you're building real-time analytics, streaming applications, or data pipelines, understanding how to efficiently receive data is crucial.
Key Concepts: Event Hubs uses consumer groups to allow multiple applications to read from the same event stream independently. Each consumer group maintains its own offset within a partition.
Consumer Groups
Consumer groups enable multiple applications or services to process events from an Event Hub independently. Each consumer group tracks its own position (offset) in the event stream for each partition.
Creating a Consumer Group
You can create consumer groups via the Azure portal, Azure CLI, or SDKs. A default consumer group ($Default) is created automatically with each Event Hub.
# Example using Azure CLI to create a consumer group
az eventhubs group create --namespace-name my-eventhub-namespace \
--event-hub-name my-event-hub \
--name my-consumer-group
Event Hubs SDKs for Receiving
Azure provides SDKs for various programming languages to facilitate receiving events. The most common and recommended approach is using the Event Hubs Kafka or the dedicated Azure Event Hubs libraries.
1. Using the Azure Event Hubs SDK (.NET, Java, Python, Node.js)
The official Azure Event Hubs libraries offer robust features for building reliable event receivers.
Example: Python
This example demonstrates receiving events using the Azure SDK for Python.
from azure.eventhub import EventHubClient, EventPosition, Receiver
eventhub_connection_str = "YOUR_EVENTHUB_CONNECTION_STRING"
consumer_group = "$Default"
event_hub_name = "your_eventhub_name"
client = EventHubClient.from_connection_string(eventhub_connection_str, consumer_group=consumer_group)
# Get a receiver for a specific partition (e.g., partition 0)
# You can iterate through all partitions or specify which ones to listen to
receiver = client.create_receiver(EventPosition("0"))
print("Starting to receive events...")
try:
while True:
events = receiver.receive(timeout=100) # Receive up to 100 events, wait up to 100s
if not events:
print("No events received in the last 100 seconds.")
continue
for event in events:
print(f"Received event: Sequence Number {event.sequence_number}, Offset {event.offset}")
# Process the event data
print(f" Body: {event.body_as_json()}") # or event.body_as_str()
# Important: After processing, update the offset for checkpointing
# This is crucial for durable consumption.
# The exact mechanism depends on your storage solution (e.g., Azure Blob Storage, Cosmos DB)
except KeyboardInterrupt:
print("Receiver stopped by user.")
finally:
receiver.close()
client.close()
print("Connection closed.")
Note: For production scenarios, implement robust checkpointing to ensure that you don't lose events if your receiver restarts. Libraries like the Azure Event Hubs Checkpoint Store Blob can help with this.
2. Using the Kafka Protocol
Event Hubs is compatible with the Apache Kafka protocol. This means you can use existing Kafka client libraries to consume data.
Prerequisites for Kafka Compatibility
- Your Event Hubs namespace must be enabled for Kafka.
- You need to use the Kafka endpoint provided by Event Hubs.
Example: Java (using Kafka Clients)
This example shows how to consume events using the standard Kafka consumer API.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class EventHubsKafkaConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_EVENTHUBS_NAMESPACE.servicebus.windows.net:9093");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "your_consumer_group_id"); // Your consumer group name
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // or "latest"
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$accesskey\" password=\"YOUR_EVENTHUBS_SHARED_ACCESS_KEY\";");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("your_eventhub_name")); // Your Event Hub name
System.out.println("Starting Kafka consumer...");
try {
while (true) {
org.apache.kafka.clients.consumer.ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received record: topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// Process the event data
}
// Commit offsets periodically for durable consumption
// consumer.commitSync();
}
} catch (Exception e) {
System.err.println("Error occurred: " + e.getMessage());
e.printStackTrace();
} finally {
consumer.close();
System.out.println("Kafka consumer closed.");
}
}
}
Choosing the Right SDK
- Use the dedicated Azure Event Hubs SDKs for the most integrated experience with Azure features like checkpointing and better management.
- Use Kafka clients if you already have a Kafka ecosystem or prefer to leverage existing Kafka infrastructure and knowledge.
Best Practices for Receiving Data
- Consumer Groups: Always use consumer groups to enable parallel processing and independent consumption.
- Checkpointing: Implement robust checkpointing to recover from failures and ensure no data loss or duplicate processing (within acceptable limits).
- Error Handling: Gracefully handle exceptions, network issues, and malformed event data.
- Batching: Process events in batches for better throughput and efficiency. The SDKs often provide mechanisms for this.
- Monitoring: Monitor your consumer applications for latency, throughput, and errors. Azure Monitor provides metrics for Event Hubs.
- Partition Management: Design your application to handle the number of partitions in your Event Hub. Consider rebalancing if partitions are added or removed.
By following these guidelines, you can build efficient and resilient applications that consume data from Azure Event Hubs.
← Previous: Sending Data Next: Understanding Partitions →