Azure Event Hubs: End-to-End Stream Processing Tutorial

Learn to build robust real-time data pipelines with Azure.

Introduction

This tutorial guides you through setting up and implementing an end-to-end stream processing solution using Azure Event Hubs. We'll cover creating the necessary Azure resources, developing simple producer and consumer applications, and essential monitoring techniques.

Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. It can receive and process millions of events per second. This makes it ideal for scenarios like real-time analytics, application logging, and data warehousing.

Prerequisites

  • An active Azure subscription.
  • Basic understanding of C# or Python programming.
  • Azure CLI installed and configured, or access to the Azure Portal.

Step 1: Set up Azure Resources

Before we start, ensure you have an Azure Resource Group to organize your resources.

You can create a Resource Group using the Azure CLI:

Azure CLI
az group create --name myEventHubsRG --location eastus

Replace myEventHubsRG and eastus with your desired resource group name and location.

Step 2: Create an Event Hub Namespace and Event Hub

An Event Hubs namespace is a logical container for Event Hubs. Each Event Hub within a namespace receives events from producers and makes them available to consumers.

Using Azure CLI to create the namespace:

Azure CLI
az eventhubs namespace create --name myEventHubsNamespace --resource-group myEventHubsRG --location eastus --sku Standard

Next, create the Event Hub within the namespace:

Azure CLI
az eventhubs eventhub create --name myEventHub --namespace-name myEventHubsNamespace --resource-group myEventHubsRG

Note: The Standard SKU offers features like geo-disaster recovery and tiered pricing. For simpler scenarios, Basic might suffice.

Step 3: Create a Consumer Group

Consumer groups allow multiple applications or instances of the same application to read from an Event Hub independently. This is crucial for parallel processing and load balancing.

Create a consumer group using Azure CLI:

Azure CLI
az eventhubs consumer-group create --name myConsumerGroup --eventhub-name myEventHub --namespace-name myEventHubsNamespace --resource-group myEventHubsRG

Step 4: Develop the Producer Application

The producer application sends events to your Event Hub. We'll use a simple C# example.

Create a New Project

dotnet new console -o EventHubProducerApp cd EventHubProducerApp

Install the Azure Event Hubs NuGet Package

dotnet add package Azure.Messaging.EventHubs

Update Program.cs

Replace the contents of Program.cs with the following:

C#
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

// Replace with your actual connection string and event hub name
const string connectionString = "Endpoint=sb://myEventHubsNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY";
const string eventHubName = "myEventHub";

EventHubProducerClient producerClient = new EventHubProducerClient(connectionString, eventHubName);

Console.WriteLine("Sending events...");

using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

for (int i = 1; i <= 5; i++)
{
    string message = $"Event {i}: Hello, Azure Event Hubs!";
    if (!eventBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
    {
        throw new Exception($"The message \"{message}\" is too large and cannot be added to the batch.");
    }
}

try
{
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine($"A batch of {eventBatch.Count} events has been sent.");
}
finally
{
    await producerClient.DisposeAsync();
}

Console.WriteLine("Events sent. Press Enter to exit.");
Console.ReadLine();

Important: Retrieve your Event Hubs connection string from the Azure Portal (Event Hubs -> your namespace -> Shared access policies) and replace YOUR_SHARED_ACCESS_KEY. Ensure you use a policy with "Send" permissions.

Step 5: Develop the Consumer Application

The consumer application reads events from the Event Hub. Here's a Python example.

Set up your Python Environment

Make sure you have Python installed. Create a virtual environment and install the necessary library.

python -m venv venv source venv/bin/activate # On Windows use `venv\Scripts\activate` pip install azure-eventhub

Create a Consumer Script

Create a file named consumer.py with the following content:

Python
import asyncio
from azure.eventhub.aio import EventHubConsumerClient

# Replace with your actual connection string, event hub name, and consumer group name
fully_qualified_namespace = "myEventHubsNamespace.servicebus.windows.net"
event_hub_name = "myEventHub"
consumer_group = "$Default" # Or "myConsumerGroup" if you created a custom one
# Use a connection string with 'Listen' permission
connection_str = "Endpoint=sb://myEventHubsNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_SHARED_ACCESS_KEY"


async def main():
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        event_hub_name,
    )

    async def on_event(partition_context, event):
        print(f"Received event: {event.body_as_str()}")
        await partition_context.update_checkpoint(event)

    async with client:
        await client.receive_batch(on_event)

if __name__ == "__main__":
    print("Starting Event Hub consumer...")
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Consumer stopped.")

Important: Replace placeholders with your actual namespace, event hub name, and a connection string with "Listen" permissions. The $Default consumer group is created automatically.

Step 6: Deploy and Monitor

Run the Producer

Navigate to your producer application directory and run it:

dotnet run

Run the Consumer

Open a new terminal, activate your virtual environment, and run the consumer script:

source venv/bin/activate python consumer.py

You should see the events being printed in the consumer's terminal.

Azure Event Hubs Monitoring Metrics

Monitoring

Azure provides comprehensive monitoring capabilities for Event Hubs through Azure Monitor.

  • Metrics: Track incoming and outgoing messages, latency, connection counts, and more.
  • Logs: Collect diagnostic logs for detailed troubleshooting.
  • Alerts: Set up alerts based on metric thresholds to proactively identify issues.

Navigate to your Event Hubs namespace in the Azure Portal and select "Metrics" or "Diagnostic settings" to explore these features.

Conclusion

Congratulations! You have successfully set up an end-to-end stream processing pipeline using Azure Event Hubs. This tutorial covered the fundamental steps of creating resources, developing basic producer and consumer applications, and highlighted the importance of monitoring.

From here, you can integrate Event Hubs with other Azure services like Azure Functions for serverless processing, Azure Stream Analytics for complex real-time analytics, or Azure Data Lake Storage for long-term storage.

Next Steps: Explore Azure Functions integration for event-driven processing and Azure Stream Analytics for powerful real-time dashboards.