Real-time Analytics with Azure Event Hubs: A Step-by-Step Tutorial
This tutorial will guide you through setting up a basic real-time analytics pipeline using Azure Event Hubs. We'll cover ingesting data, processing it with Azure Functions, and visualizing the results.
High-level architecture for real-time analytics with Azure Event Hubs.
Prerequisites
- An Azure subscription.
- Azure CLI installed and configured.
- Basic understanding of Azure services like Event Hubs, Azure Functions, and Azure Cosmos DB.
Step 1: Create an Azure Event Hubs Namespace and Event Hub
First, we need a place to send our events. We'll create an Event Hubs namespace and then an Event Hub within it.
Using the Azure CLI:
az eventhubs namespace create --name my-eventhubs-namespace-unique --resource-group myResourceGroup --location eastus
az eventhubs create --name my-eventhub --resource-group myResourceGroup --namespace-name my-eventhubs-namespace-unique --partition-count 2 --enable-auto-inflate true --max-throughput-units 10
Note: Replace my-eventhubs-namespace-unique with a globally unique name and myResourceGroup with your desired resource group name.
Step 2: Create an Azure Function for Data Ingestion
We'll create an HTTP-triggered Azure Function that accepts incoming data and sends it to our Event Hub.
Function Code (run.py):
import logging
import azure.functions as func
from azure.eventhub import EventHubClient, EventData
async def send_event(connection_str, event_hub_name, events):
client = EventHubClient.from_connection_string(connection_str, EventHubClient.PRODUCER)
producer = await client.create_producer()
batch = await producer.create_batch()
for event_data in events:
try:
batch.add(EventData(event_data))
except Exception as e:
logging.error(f"Failed to add event to batch: {e}")
if batch.size > 0:
await producer.send(batch)
logging.info(f"Sent {batch.size} events.")
await producer.close()
await client.close()
async def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
try:
req_body = req.get_json()
except ValueError:
return func.HttpResponse(
"Please pass a JSON object in the request body",
status_code=400
)
if req_body:
connection_str = "Endpoint=sb://my-eventhubs-namespace-unique.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_EVENTHUB_PRIMARY_KEY" # Replace with your connection string
event_hub_name = "my-eventhub"
events_to_send = [str(req_body)] # Assuming the body is a single event, or iterate if it's a list
await send_event(connection_str, event_hub_name, events_to_send)
return func.HttpResponse(
"Event(s) sent successfully to Event Hub.",
status_code=200
)
else:
return func.HttpResponse(
"No data received.",
status_code=400
)
Function Configuration (function.json):
{
"scriptFile": "__init__.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"post"
]
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Important: You'll need to retrieve your Event Hubs connection string from the Azure portal and replace YOUR_EVENTHUB_PRIMARY_KEY. It's best practice to use Azure Key Vault for storing secrets.
Step 3: Create an Azure Function for Real-time Processing
This function will be triggered by new events in the Event Hub, perform some simple analytics (e.g., counting events per minute), and store the results.
Function Code (run.py):
import logging
import datetime
import json
from azure.functions import EventHubEvent
from azure.cosmos import CosmosClient, PartitionKey
# Configuration for Cosmos DB (replace with your details)
COSMOS_ENDPOINT = "YOUR_COSMOS_ENDPOINT"
COSMOS_KEY = "YOUR_COSMOS_KEY"
DATABASE_NAME = "AnalyticsDB"
CONTAINER_NAME = "RealTimeMetrics"
def get_cosmos_container():
client = CosmosClient(COSMOS_ENDPOINT, credential=COSMOS_KEY)
database = client.get_database_client(DATABASE_NAME)
container = database.get_container_client(CONTAINER_NAME)
return container
async def main(events: list[EventHubEvent]):
container = get_cosmos_container()
# Simple aggregation: count events per minute
hourly_counts = {}
for event in events:
try:
event_data = event.get_body().decode('utf-8')
logging.info(f"Processing event: {event_data}")
# Parse event data if it's JSON
try:
data = json.loads(event_data)
except json.JSONDecodeError:
logging.warning("Event is not valid JSON, skipping parsing.")
data = {"raw": event_data} # Treat as raw string if not JSON
timestamp = datetime.datetime.utcnow()
minute_key = timestamp.strftime("%Y-%m-%dT%H:%M") # e.g., 2023-10-27T10:30
if minute_key not in hourly_counts:
hourly_counts[minute_key] = 0
hourly_counts[minute_key] += 1
except Exception as e:
logging.error(f"Error processing event: {e}")
# Upsert aggregated data to Cosmos DB
for minute_key, count in hourly_counts.items():
document = {
'id': f"metric-{minute_key}", # Unique ID
'timestamp': minute_key,
'event_count': count,
'partitionKey': minute_key # Assuming minute-based partitioning for simplicity
}
try:
container.upsert_item(document)
logging.info(f"Upserted metrics for {minute_key}: {count} events.")
except Exception as e:
logging.error(f"Error upserting to Cosmos DB: {e}")
Function Configuration (function.json):
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventHubTrigger",
"name": "events",
"direction": "in",
"eventHubName": "my-eventhub",
"connection": "EventHubConnectionString"
},
{
"type": "cosmosDB",
"name": "outputDocument",
"direction": "out",
"databaseName": "AnalyticsDB",
"collectionName": "RealTimeMetrics",
"createIfNotExists": true,
"connectionStringSetting": "CosmosDBConnectionString"
}
]
}
Important: You'll need to create an Azure Cosmos DB account, database, and container. Set the EventHubConnectionString and CosmosDBConnectionString application settings in your Azure Function app configuration.
Step 4: Visualize the Data with Azure Cosmos DB
Azure Cosmos DB can be directly queried and visualized using tools like Azure Data Explorer or by building a custom dashboard.
For a quick visualization, you can use the built-in "Explore data" feature in the Azure portal for your Cosmos DB container.
Example of visualizing metrics in Azure Cosmos DB Data Explorer.
You can run SQL queries like:
SELECT TOP 100 * FROM c ORDER BY c.timestamp DESC
For more advanced real-time dashboards, consider integrating with Power BI, Azure Dashboards, or a custom web application.
Conclusion
Congratulations! You've successfully set up a basic real-time analytics pipeline using Azure Event Hubs. This pipeline can be extended to handle more complex data transformations, integrate with various data sinks, and provide richer insights.
Next Steps
- Explore more advanced Event Hubs features like Capture and Consumer Groups.
- Integrate with Azure Stream Analytics for powerful, serverless real-time processing.
- Build a custom web dashboard using technologies like React or Angular to visualize your Cosmos DB data in real-time.