Receiving Messages from Azure Event Hubs
This tutorial guides you through the process of receiving messages from an Azure Event Hub using a consumer application. We'll cover setting up a consumer, connecting to your Event Hub, and processing incoming messages.
Prerequisites
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created within it.
- A Shared Access Signature (SAS) key with listen permissions for your Event Hub.
- .NET SDK installed (for the C# example).
- Python 3.6+ installed (for the Python example).
1. Setting up your Consumer Application
You can use various SDKs to build your consumer application. Here, we provide examples using C# (.NET) and Python.
2. C# Consumer Example (using Azure.Messaging.EventHubs)
This example demonstrates how to create a simple console application that connects to an Event Hub and starts listening for messages.
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventHubConsumer
{
// Replace with your Event Hub connection string and consumer group name
private const string EventHubConnectionString = "";
private const string EventHubName = "";
private const string ConsumerGroup = "$Default"; // Or your custom consumer group
public static async Task Main(string[] args)
{
await using var consumer = new EventProcessorClient(
new EventProcessorClientOptions {
ConsumerGroup = ConsumerGroup
},
new Azure.Messaging.EventHubs.EventHubConsumerClient(
EventHubConnectionString,
EventHubName
)
);
Console.WriteLine($"Starting consumer for Event Hub '{EventHubName}' in consumer group '{ConsumerGroup}'...");
consumer.ProcessEvent += (Partition, Event, cancellationToken) => {
string messageBody = Encoding.UTF8.GetString(Event.Data.Array, Event.Data.Offset, Event.Data.Count);
Console.WriteLine($"Received message: {messageBody} from partition {Partition.PartitionId}");
return Task.CompletedTask;
};
consumer.ProcessError += (Partition, Error, cancellationToken) => {
Console.WriteLine($"Error processing partition {Partition.PartitionId}: {Error.Reason}");
return Task.CompletedTask;
};
try
{
await consumer.StartProcessingAsync();
Console.WriteLine("Press Enter to stop receiving.");
Console.ReadLine();
}
finally
{
await consumer.StopProcessingAsync();
Console.WriteLine("Consumer stopped.");
}
}
}
Explanation:
EventHubConnectionString: Your Event Hub connection string, obtained from the Azure portal.EventHubName: The name of your Event Hub.ConsumerGroup: The consumer group your application belongs to.$Defaultis the built-in group.EventProcessorClient: Manages the processing of events, including partition distribution and checkpointing.ProcessEvent: An event handler that is triggered for each message received.ProcessError: An event handler for errors that occur during processing.
Important: Ensure you replace the placeholder connection string and Event Hub name with your actual credentials. For production environments, consider using Azure Key Vault to manage secrets securely.
3. Python Consumer Example (using azure-eventhub)
Here's a Python script to receive messages:
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
# Replace with your Event Hub connection string and consumer group name
EVENTHUB_CONNECTION_STR = ""
EVENTHUB_NAME = ""
CONSUMER_GROUP = "$Default" # Or your custom consumer group
async def process_event(partition_context, event):
"""
Processes a single event.
"""
print(f"Received message: {event.body_str} from partition {partition_context.partition_id}")
# In a real application, you'd update checkpoints here to mark processed events.
# await partition_context.update_checkpoint(event)
async def process_error(partition_context, error):
"""
Processes errors that occur during event processing.
"""
print(f"Error processing partition {partition_context.partition_id}: {error}")
async def main():
client = EventHubConsumerClient.from_connection_string(
EVENTHUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
event_hub_name=EVENTHUB_NAME
)
print(f"Starting consumer for Event Hub '{EVENTHUB_NAME}' in consumer group '{CONSUMER_GROUP}'...")
async with client:
await client.receive(
on_event=process_event,
on_error=process_error
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Consumer stopped by user.")
Explanation:
EVENTHUB_CONNECTION_STR: Your Event Hub connection string.EVENTHUB_NAME: The name of your Event Hub.CONSUMER_GROUP: The desired consumer group.EventHubConsumerClient: The main client for connecting and consuming events.process_event: Asynchronous function to handle incoming events.process_error: Asynchronous function to handle any errors.
Note: The Python SDK relies on asyncio. Ensure your environment supports it and that you install the SDK using pip install azure-eventhub[azure-identity].
4. Running the Consumer Application
- Replace the placeholder connection string and Event Hub name in the code with your actual details.
- Compile and run the C# application using the .NET CLI:
dotnet run - Run the Python script:
python your_consumer_script.py
Once the consumer is running, any messages sent to your Event Hub will be received and printed to the console.
5. Key Concepts for Receiving Messages
- Consumer Group: A view of an Event Hub. Each consumer group allows an independent read of the event stream. This enables multiple applications to process events from the same Event Hub without interfering with each other.
- Partition: Event Hubs partitions data into ordered sequences. Consumers typically read from partitions in parallel to achieve higher throughput.
- Offset: A unique identifier for an event within a partition. Consumers use offsets to track their progress.
- Checkpointing: The process of saving the offset of the last successfully processed event for a given partition and consumer group. This allows a consumer to resume processing from where it left off in case of restarts or failures. The SDKs often manage checkpointing automatically or provide mechanisms to implement it.
Next Steps
- Explore advanced error handling and retry strategies.
- Learn about implementing robust checkpointing mechanisms.
- Integrate message processing with other Azure services like Azure Functions or Azure Stream Analytics.