Sending and Receiving Events with Azure Event Hubs
This guide covers the fundamental operations of sending events to and receiving events from Azure Event Hubs. We'll explore common patterns and provide code examples using popular SDKs.
Prerequisites
Before you begin, ensure you have:
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created.
- A connection string for your Event Hubs namespace.
- The necessary SDKs installed for your chosen programming language (e.g., Azure SDK for Python, .NET, Java, Node.js).
Sending Events
Event Hubs allows for high-throughput data ingestion. You can send events individually or in batches. Batching is generally more efficient.
Using the Azure SDK (Python Example)
The following code snippet demonstrates how to send a single event using the Python SDK:
from azure.eventhub import EventHubProducerClient, EventData
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
try:
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
with producer:
event_data = EventData("Hello, Event Hubs!")
producer.send_event(event_data)
print("Event sent successfully!")
except Exception as e:
print(f"Error sending event: {e}")
Sending Batched Events
To send events in batches, you can create an `EventDataBatch` object:
from azure.eventhub import EventHubProducerClient, EventData, EventDataBatch
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
try:
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
with producer:
batch = producer.create_batch()
for i in range(5):
event_content = f"Message {i}"
event_data = EventData(event_content)
try:
batch.add(event_data)
except ValueError:
# Batch is full, send the current batch and start a new one
producer.send_batch(batch)
batch = producer.create_batch()
batch.add(event_data)
# Send the last batch if it's not empty
if batch.count > 0:
producer.send_batch(batch)
print(f"{batch.count} events sent in the last batch.")
except Exception as e:
print(f"Error sending events: {e}")
Receiving Events
Consumers can read events from Event Hubs using consumer groups. Each consumer group maintains its own read position within the Event Hub partitions.
Using the Azure SDK (Python Example)
The following code demonstrates how to create a consumer client and receive events:
from azure.eventhub import EventHubConsumerClient, EventPosition
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
consumer_group = "$Default" # Or your custom consumer group
try:
consumer = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
eventhub_name=eventhub_name
)
def process_events(event):
print(f"Received event: {event.body_as_str()}")
# Process the event data here...
with consumer:
# Start receiving events from the latest available offset
consumer.subscribe(
["1"], # Specify partition IDs you want to receive from, e.g., ["1"] or all partitions
on_message=process_events,
event_position=EventPosition("-1") # Use "-1" for earliest, "latest" for latest
)
# Keep the consumer running to receive events
consumer.run()
except Exception as e:
print(f"Error receiving events: {e}")
Key Considerations
- Partitioning: Events are sent to specific partitions within an Event Hub. Understanding partitioning is key to parallel processing and ordering guarantees.
- Consumer Groups: Always use consumer groups to decouple event producers from event consumers.
- Error Handling: Implement robust error handling for both sending and receiving operations.
- Offset Management: Ensure your consumers properly manage their offsets to avoid losing or reprocessing events. The SDK often helps with this.
- SDK Choice: Select the Azure SDK that best fits your development environment and language.
For more advanced scenarios, such as schema validation, custom partitioning strategies, and integration with other Azure services like Azure Functions and Azure Stream Analytics, please refer to the Advanced Topics section.
Previous: Authentication Next: Partitioning