Introduction
Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. It can capture millions of events per second so you can develop a variety of real-time analytics and data streaming applications. Azure Stream Analytics is a fully managed, real-time analytics service that helps you analyze and process high volumes of streaming data from sources like Event Hubs.
In this tutorial, we'll walk through setting up a simple pipeline to ingest data into an Event Hub and then use Stream Analytics to perform a basic aggregation on that data.
Prerequisites
- An Azure subscription. If you don't have one, you can create a free account.
- Basic understanding of Azure services.
- Azure CLI or Azure Portal access.
Step 1: Create an Azure Event Hubs Namespace and Event Hub
First, we need to create an Event Hubs namespace, which is a container for Event Hubs. Then, we'll create an actual Event Hub within that namespace.
Using Azure Portal:
- Navigate to the Azure Portal and search for "Event Hubs".
- Click "Create".
- Select your Subscription and Resource Group. If you don't have one, create a new one.
- Enter a unique namespace name (e.g.,
my-event-hubs-ns-unique). - Select a Location.
- Choose a Pricing tier (Basic or Standard are suitable for this tutorial).
- Click "Review + create" and then "Create".
- Once the namespace is deployed, navigate to it and click "+ Event Hub".
- Enter an Event Hub name (e.g.,
realtime-data). - Keep the default settings for the number of partitions and message retention period, or adjust as needed.
- Click "Create".
Using Azure CLI:
Replace placeholders like myResourceGroup, my-event-hubs-ns-unique, and realtime-data with your desired names.
az eventhubs namespace create --name my-event-hubs-ns-unique --resource-group myResourceGroup --location eastus --sku Standard
az eventhubs create --name realtime-data --resource-group myResourceGroup --namespace-name my-event-hubs-ns-unique
Step 2: Create an Azure Stream Analytics Job
Now, let's set up the Stream Analytics job that will read from Event Hubs.
Using Azure Portal:
- Search for "Stream Analytics jobs" in the Azure Portal.
- Click "Create Stream Analytics job".
- Enter a Job name (e.g.,
realtime-analytics-job). - Select your Subscription and Resource Group.
- Choose a Location.
- For "Hosting environment", select "Cloud".
- Click "Review + create" and then "Create".
Step 3: Configure the Stream Analytics Job Input
We need to tell the Stream Analytics job where to get its data from.
- Navigate to your newly created Stream Analytics job.
- Under "Job topology", click "Inputs".
- Click "Add stream input" and select "Event Hubs".
- Enter an Input alias (e.g.,
eventHubInput). This is how you'll refer to this input in your query. - Select "Select Event Hubs from your subscriptions".
- Choose your Subscription, Event Hubs namespace, and the Event Hub name you created earlier.
- For "Consumer group", you can use the default '$Default' or create a new one. It's good practice to use dedicated consumer groups for different applications.
- For "Authentication mode", choose "Connection string" and click the "Listen" key from your Event Hub's "Shared access policies".
- Click "Save".
Step 4: Configure the Stream Analytics Job Output
We need a place to send the processed data. For this example, we'll use a simple output, but you could use Blob Storage, another Event Hub, or Power BI.
- In your Stream Analytics job, click "Outputs".
- Click "Add" and select "Blob storage" as a simple destination.
- Enter an Output alias (e.g.,
outputBlob). - Select "Select Blob storage from your subscriptions".
- Choose your Subscription, Storage account, and Container. Create a container if you don't have one.
- Set "Serialization format" to "JSON".
- For "Authentication mode", use "Access key" and select your Storage Account.
- Set "Path pattern" to something like
realtime/{date}/{time}to organize your output files. - Click "Save".
Step 5: Write the Stream Analytics Query
This is where the magic happens. We'll write a SQL-like query to process the incoming data.
- In your Stream Analytics job, click "Query".
- Replace the default query with the following example. This query simply selects all data from the input and passes it to the output.
SELECT
*
INTO
outputBlob
FROM
eventHubInput
For a more practical example, let's say your events are JSON objects with a temperature field. You could calculate a rolling average:
SELECT
System.Timestamp AS WindowEnd,
AVG(CAST(temperature AS FLOAT)) AS AverageTemperature
INTO
outputBlob
FROM
eventHubInput TIMESTAMP BY eventEnqueuedUtcTime
GROUP BY
TumblingWindow(minute, 1)
Click "Save query".
Step 6: Start the Stream Analytics Job
Now that everything is configured, we can start the job.
- In your Stream Analytics job, click "Start" on the overview page.
- Choose "Now" for "Job start time".
- Click "Start".
Step 7: Send Data to Event Hubs
To see your pipeline in action, you need to send data to your Event Hub.
Using a simple Python script:
Install the necessary library: pip install azure-eventhub
import os
import json
import time
from azure.eventhub import EventHubProducer, EventData
# Replace with your Event Hub connection string
# You can get this from your Event Hub's "Shared access policies" in the Azure Portal
# Make sure to use the "Send" key
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=your-key-name;SharedAccessKey=your-key-value"
EVENT_HUB_NAME = "realtime-data" # Your Event Hub name
def send_event(producer, data):
event_data = EventData(json.dumps(data))
producer.send(event_data)
print(f"Sent: {data}")
if __name__ == "__main__":
producer = EventHubProducer.from_connection_string(EVENT_HUB_CONNECTION_STR, event_hub_name=EVENT_HUB_NAME)
try:
for i in range(20):
sample_data = {
"deviceId": "sensor-001",
"timestamp": int(time.time()),
"temperature": 20.0 + (i % 5),
"humidity": 50.0 + (i % 10)
}
send_event(producer, sample_data)
time.sleep(1) # Send an event every second
except Exception as e:
print(f"An error occurred: {e}")
finally:
producer.close()
print("Producer closed.")
Run this script from your terminal. Remember to replace the placeholder connection string with your actual Event Hubs send key connection string.
Monitoring and Verification
After sending some data, you can monitor your Stream Analytics job's metrics in the Azure Portal. Check your Blob Storage container for the output files generated by the job.
Conclusion
You have successfully set up a real-time data processing pipeline using Azure Event Hubs and Azure Stream Analytics. This basic example can be extended to handle more complex scenarios, such as anomaly detection, real-time dashboards, and data warehousing.
Explore More Azure Streaming Tutorials