Sending and Receiving Events
This tutorial will guide you through the fundamental steps of sending events to and receiving events from Azure Event Hubs using popular programming languages.
Prerequisites
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created within it. (Refer to the "Creating an Event Hubs Namespace" tutorial if you haven't done this yet.)
- Azure CLI installed and configured, or Azure portal access.
- A development environment set up for your chosen language (e.g., .NET, Python, Java, Node.js).
- The Event Hubs connection string for your namespace. You can find this in the Azure portal under your Event Hubs namespace settings -> "Shared access policies".
Sending Events
Sending events is a core operation. You'll typically use an Event Hubs SDK to establish a connection and send data in batches or individually.
Using the .NET SDK
First, install the necessary NuGet package:
dotnet add package Azure.Messaging.EventHubs
Here's a C# example of sending events:
using Azure.Messaging.EventHubs;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventSender
{
private const string eventHubsConnectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
private const string eventHubName = "YOUR_EVENT_HUB_NAME";
public static async Task SendEventsAsync()
{
await using var producerClient = new EventDataSender(eventHubsConnectionString, eventHubName);
using EventData batch = await producerClient.CreateBatchAsync();
for (int i = 0; i < 5; i++)
{
var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i}"));
if (!batch.TryAddMessage(eventData))
{
throw new Exception($"Event {i} is too large for the batch and cannot be added.");
}
}
try
{
await producerClient.SendAsync(batch);
Console.WriteLine("A batch of events has been sent.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending events: {ex.Message}");
}
}
}
Using the Python SDK
Install the SDK:
pip install azure-eventhub
Python example for sending events:
import os
import asyncio
from azure.eventhub.aio import EventHubProducerClient
async def send_events():
conn_str = os.environ["EVENTHUB_CONNECTION_STR"]
eventhub_name = "YOUR_EVENT_HUB_NAME"
producer = EventHubProducerClient.from_connection_string(conn_str, eventhub_name=eventhub_name)
async with producer:
batch = await producer.create_batch()
for i in range(5):
message = f"Event {i}"
if not batch.add(message.encode('utf-8')):
print(f"Event {i} is too large for the batch and cannot be added.")
break
await producer.send_batch(batch)
print("A batch of events has been sent.")
if __name__ == "__main__":
asyncio.run(send_events())
Receiving Events
Receiving events involves connecting to an Event Hub and creating an Event Processor or Event Consumer to read incoming data. Consumers typically operate within a consumer group.
Using the .NET SDK
Install the NuGet package:
dotnet add package Azure.Messaging.EventHubs.Processor
C# example for receiving events:
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventReceiver
{
private const string eventHubsConnectionString = "YOUR_EVENT_HUBS_CONNECTION_STRING";
private const string eventHubName = "YOUR_EVENT_HUB_NAME";
private const string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or your specific consumer group
public static async Task ReceiveEventsAsync()
{
await using var consumerClient = new EventHubConsumerClient(consumerGroup, eventHubsConnectionString, eventHubName);
Console.WriteLine("Starting to receive events...");
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
Console.WriteLine($"Received event from partition {partitionEvent.Partition.Id}:");
string messageBody = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
Console.WriteLine($" Message: {messageBody}");
Console.WriteLine($" Offset: {partitionEvent.Data.Offset}, Sequence Number: {partitionEvent.Data.SequenceNumber}");
}
}
}
Using the Python SDK
Install the SDK:
pip install azure-eventhub
Python example for receiving events:
import os
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
async def receive_events():
conn_str = os.environ["EVENTHUB_CONNECTION_STR"]
eventhub_name = "YOUR_EVENT_HUB_NAME"
consumer_group = "$Default" # Or your specific consumer group
consumer = EventHubConsumerClient.from_connection_string(
conn_str, consumer_group=consumer_group, eventhub_name=eventhub_name
)
async with consumer:
print("Starting to receive events...")
async for event_data in consumer.receive_batch(max_events=10):
print(f"Received event with sequence number: {event_data.sequence_number}")
message = event_data.body_bytes.decode('utf-8')
print(f" Message: {message}")
print(f" Offset: {event_data.offset}")
if __name__ == "__main__":
asyncio.run(receive_events())
Important: Always replace placeholder connection strings and event hub names with your actual Azure credentials. It's recommended to use environment variables or a secure configuration management system for storing secrets.
Tip: For production scenarios, consider using the Event Hubs Processor library, which handles checkpointing, load balancing, and error handling automatically, making your consumer more robust.