Sending and Receiving Events
This section guides you through the fundamental operations of sending events to and receiving events from Azure Event Hubs using various SDKs.
Sending Events
Sending events involves connecting to an Event Hub and publishing messages. The process typically includes:
- Creating an Event Hubs client.
- Creating an event data object.
- Sending the event data to the specified hub.
Using the .NET SDK
Here's a C# example demonstrating how to send events:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Text;
using System.Threading.Tasks;
// Replace with your Event Hubs connection string and hub name
string connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
try
{
using EventDataBatch eventDataBatch = await producerClient.CreateBatchAsync();
for (int i = 1; i <= 5; i++)
{
var message = $"Event {i}";
if (!eventDataBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
{
// The batch is full, send it and create a new one.
throw new Exception($"The batch is full. Failed to add message {i}.");
}
}
// Send the batch to the Event Hub
await producerClient.SendAsync(eventDataBatch);
Console.WriteLine("A batch of events has been sent.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending events: {ex.Message}");
}
Using the Python SDK
Here's a Python example for sending events:
import os
import asyncio
from azure.eventhub import EventData, EventHubProducerClient
async def send_events():
# Replace with your Event Hubs connection string and hub name
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
event_hub_name = "YOUR_EVENTHUB_NAME"
producer = EventHubProducerClient.from_connection_string(connection_str, event_hub_name)
async with producer:
event_data_batch = await producer.create_batch()
for i in range(1, 6):
message = f"Event {i}"
if not event_data_batch.add(EventData(message)):
print(f"Batch is full. Failed to add message {i}.")
break
await producer.send_batch(event_data_batch)
print("A batch of events has been sent.")
if __name__ == "__main__":
asyncio.run(send_events())
Receiving Events
Receiving events involves connecting to an Event Hub as a consumer and processing messages. This typically involves:
- Creating an Event Hubs consumer client.
- Specifying a consumer group and partition to listen to.
- Iterating over incoming events.
Using the .NET SDK
Here's a C# example for receiving events:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
// Replace with your Event Hubs connection string and hub name
string connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or your custom consumer group
await using var consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine($"Listening to events from consumer group '{consumerGroup}'...");
try
{
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
var message = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
Console.WriteLine($"Received event: '{message}' from partition {partitionEvent.Partition.Id}");
}
}
catch (Exception ex)
{
Console.WriteLine($"Error receiving events: {ex.Message}");
}
Using the Python SDK
Here's a Python example for receiving events:
import os
import asyncio
from azure.eventhub import EventData, EventHubConsumerClient
async def receive_events():
# Replace with your Event Hubs connection string and hub name
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
event_hub_name = "YOUR_EVENTHUB_NAME"
consumer_group = "$Default" # Or your custom consumer group
consumer = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group,
event_hub_name
)
print(f"Listening to events from consumer group '{consumer_group}'...")
async with consumer:
async for event_data in consumer.receive_events():
message = event_data.body_as_str()
print(f"Received event: '{message}' from partition {event_data.partition_id}")
if __name__ == "__main__":
asyncio.run(receive_events())
Important: Always ensure you handle potential exceptions gracefully and manage client lifecycles appropriately in production applications.
Tip: For advanced scenarios like checkpointing, specific partition processing, or load balancing, explore the features of the Event Hubs consumer clients in each SDK.