Real-time Analytics with Azure Event Hubs: A Step-by-Step Tutorial

This tutorial will guide you through setting up a basic real-time analytics pipeline using Azure Event Hubs. We'll cover ingesting data, processing it with Azure Functions, and visualizing the results.

Azure Event Hubs Real-time Architecture Diagram

High-level architecture for real-time analytics with Azure Event Hubs.

Prerequisites

Step 1: Create an Azure Event Hubs Namespace and Event Hub

First, we need a place to send our events. We'll create an Event Hubs namespace and then an Event Hub within it.

Using the Azure CLI:


az eventhubs namespace create --name my-eventhubs-namespace-unique --resource-group myResourceGroup --location eastus
az eventhubs create --name my-eventhub --resource-group myResourceGroup --namespace-name my-eventhubs-namespace-unique --partition-count 2 --enable-auto-inflate true --max-throughput-units 10
            

Note: Replace my-eventhubs-namespace-unique with a globally unique name and myResourceGroup with your desired resource group name.

Step 2: Create an Azure Function for Data Ingestion

We'll create an HTTP-triggered Azure Function that accepts incoming data and sends it to our Event Hub.

Function Code (run.py):


import logging
import azure.functions as func
from azure.eventhub import EventHubClient, EventData

async def send_event(connection_str, event_hub_name, events):
    client = EventHubClient.from_connection_string(connection_str, EventHubClient.PRODUCER)
    producer = await client.create_producer()
    batch = await producer.create_batch()

    for event_data in events:
        try:
            batch.add(EventData(event_data))
        except Exception as e:
            logging.error(f"Failed to add event to batch: {e}")

    if batch.size > 0:
        await producer.send(batch)
        logging.info(f"Sent {batch.size} events.")
    await producer.close()
    await client.close()

async def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    try:
        req_body = req.get_json()
    except ValueError:
        return func.HttpResponse(
             "Please pass a JSON object in the request body",
             status_code=400
        )

    if req_body:
        connection_str = "Endpoint=sb://my-eventhubs-namespace-unique.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_EVENTHUB_PRIMARY_KEY" # Replace with your connection string
        event_hub_name = "my-eventhub"
        
        events_to_send = [str(req_body)] # Assuming the body is a single event, or iterate if it's a list

        await send_event(connection_str, event_hub_name, events_to_send)

        return func.HttpResponse(
             "Event(s) sent successfully to Event Hub.",
             status_code=200
        )
    else:
        return func.HttpResponse(
             "No data received.",
             status_code=400
        )
            

Function Configuration (function.json):


{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}
            

Important: You'll need to retrieve your Event Hubs connection string from the Azure portal and replace YOUR_EVENTHUB_PRIMARY_KEY. It's best practice to use Azure Key Vault for storing secrets.

Step 3: Create an Azure Function for Real-time Processing

This function will be triggered by new events in the Event Hub, perform some simple analytics (e.g., counting events per minute), and store the results.

Function Code (run.py):


import logging
import datetime
import json
from azure.functions import EventHubEvent
from azure.cosmos import CosmosClient, PartitionKey

# Configuration for Cosmos DB (replace with your details)
COSMOS_ENDPOINT = "YOUR_COSMOS_ENDPOINT"
COSMOS_KEY = "YOUR_COSMOS_KEY"
DATABASE_NAME = "AnalyticsDB"
CONTAINER_NAME = "RealTimeMetrics"

def get_cosmos_container():
    client = CosmosClient(COSMOS_ENDPOINT, credential=COSMOS_KEY)
    database = client.get_database_client(DATABASE_NAME)
    container = database.get_container_client(CONTAINER_NAME)
    return container

async def main(events: list[EventHubEvent]):
    container = get_cosmos_container()
    
    # Simple aggregation: count events per minute
    hourly_counts = {}

    for event in events:
        try:
            event_data = event.get_body().decode('utf-8')
            logging.info(f"Processing event: {event_data}")
            
            # Parse event data if it's JSON
            try:
                data = json.loads(event_data)
            except json.JSONDecodeError:
                logging.warning("Event is not valid JSON, skipping parsing.")
                data = {"raw": event_data} # Treat as raw string if not JSON

            timestamp = datetime.datetime.utcnow()
            minute_key = timestamp.strftime("%Y-%m-%dT%H:%M") # e.g., 2023-10-27T10:30

            if minute_key not in hourly_counts:
                hourly_counts[minute_key] = 0
            hourly_counts[minute_key] += 1

        except Exception as e:
            logging.error(f"Error processing event: {e}")

    # Upsert aggregated data to Cosmos DB
    for minute_key, count in hourly_counts.items():
        document = {
            'id': f"metric-{minute_key}", # Unique ID
            'timestamp': minute_key,
            'event_count': count,
            'partitionKey': minute_key # Assuming minute-based partitioning for simplicity
        }
        try:
            container.upsert_item(document)
            logging.info(f"Upserted metrics for {minute_key}: {count} events.")
        except Exception as e:
            logging.error(f"Error upserting to Cosmos DB: {e}")
            

Function Configuration (function.json):


{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "my-eventhub",
      "connection": "EventHubConnectionString" 
    },
    {
      "type": "cosmosDB",
      "name": "outputDocument",
      "direction": "out",
      "databaseName": "AnalyticsDB",
      "collectionName": "RealTimeMetrics",
      "createIfNotExists": true,
      "connectionStringSetting": "CosmosDBConnectionString"
    }
  ]
}
            

Important: You'll need to create an Azure Cosmos DB account, database, and container. Set the EventHubConnectionString and CosmosDBConnectionString application settings in your Azure Function app configuration.

Step 4: Visualize the Data with Azure Cosmos DB

Azure Cosmos DB can be directly queried and visualized using tools like Azure Data Explorer or by building a custom dashboard.

For a quick visualization, you can use the built-in "Explore data" feature in the Azure portal for your Cosmos DB container.

Azure Cosmos DB Data Explorer Screenshot

Example of visualizing metrics in Azure Cosmos DB Data Explorer.

You can run SQL queries like:


SELECT TOP 100 * FROM c ORDER BY c.timestamp DESC
            

For more advanced real-time dashboards, consider integrating with Power BI, Azure Dashboards, or a custom web application.

Conclusion

Congratulations! You've successfully set up a basic real-time analytics pipeline using Azure Event Hubs. This pipeline can be extended to handle more complex data transformations, integrate with various data sinks, and provide richer insights.

Next Steps