Introduction

Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. It can capture millions of events per second so you can develop a variety of real-time analytics and data streaming applications. Azure Stream Analytics is a fully managed, real-time analytics service that helps you analyze and process high volumes of streaming data from sources like Event Hubs.

In this tutorial, we'll walk through setting up a simple pipeline to ingest data into an Event Hub and then use Stream Analytics to perform a basic aggregation on that data.

Event Hubs Architecture Diagram Placeholder

Prerequisites

  • An Azure subscription. If you don't have one, you can create a free account.
  • Basic understanding of Azure services.
  • Azure CLI or Azure Portal access.

Step 1: Create an Azure Event Hubs Namespace and Event Hub

First, we need to create an Event Hubs namespace, which is a container for Event Hubs. Then, we'll create an actual Event Hub within that namespace.

Using Azure Portal:

  1. Navigate to the Azure Portal and search for "Event Hubs".
  2. Click "Create".
  3. Select your Subscription and Resource Group. If you don't have one, create a new one.
  4. Enter a unique namespace name (e.g., my-event-hubs-ns-unique).
  5. Select a Location.
  6. Choose a Pricing tier (Basic or Standard are suitable for this tutorial).
  7. Click "Review + create" and then "Create".
  8. Once the namespace is deployed, navigate to it and click "+ Event Hub".
  9. Enter an Event Hub name (e.g., realtime-data).
  10. Keep the default settings for the number of partitions and message retention period, or adjust as needed.
  11. Click "Create".

Using Azure CLI:

Replace placeholders like myResourceGroup, my-event-hubs-ns-unique, and realtime-data with your desired names.


az eventhubs namespace create --name my-event-hubs-ns-unique --resource-group myResourceGroup --location eastus --sku Standard

az eventhubs create --name realtime-data --resource-group myResourceGroup --namespace-name my-event-hubs-ns-unique
                

Step 2: Create an Azure Stream Analytics Job

Now, let's set up the Stream Analytics job that will read from Event Hubs.

Using Azure Portal:

  1. Search for "Stream Analytics jobs" in the Azure Portal.
  2. Click "Create Stream Analytics job".
  3. Enter a Job name (e.g., realtime-analytics-job).
  4. Select your Subscription and Resource Group.
  5. Choose a Location.
  6. For "Hosting environment", select "Cloud".
  7. Click "Review + create" and then "Create".

Step 3: Configure the Stream Analytics Job Input

We need to tell the Stream Analytics job where to get its data from.

  1. Navigate to your newly created Stream Analytics job.
  2. Under "Job topology", click "Inputs".
  3. Click "Add stream input" and select "Event Hubs".
  4. Enter an Input alias (e.g., eventHubInput). This is how you'll refer to this input in your query.
  5. Select "Select Event Hubs from your subscriptions".
  6. Choose your Subscription, Event Hubs namespace, and the Event Hub name you created earlier.
  7. For "Consumer group", you can use the default '$Default' or create a new one. It's good practice to use dedicated consumer groups for different applications.
  8. For "Authentication mode", choose "Connection string" and click the "Listen" key from your Event Hub's "Shared access policies".
  9. Click "Save".

Step 4: Configure the Stream Analytics Job Output

We need a place to send the processed data. For this example, we'll use a simple output, but you could use Blob Storage, another Event Hub, or Power BI.

  1. In your Stream Analytics job, click "Outputs".
  2. Click "Add" and select "Blob storage" as a simple destination.
  3. Enter an Output alias (e.g., outputBlob).
  4. Select "Select Blob storage from your subscriptions".
  5. Choose your Subscription, Storage account, and Container. Create a container if you don't have one.
  6. Set "Serialization format" to "JSON".
  7. For "Authentication mode", use "Access key" and select your Storage Account.
  8. Set "Path pattern" to something like realtime/{date}/{time} to organize your output files.
  9. Click "Save".

Step 5: Write the Stream Analytics Query

This is where the magic happens. We'll write a SQL-like query to process the incoming data.

  1. In your Stream Analytics job, click "Query".
  2. Replace the default query with the following example. This query simply selects all data from the input and passes it to the output.

SELECT
    *
INTO
    outputBlob
FROM
    eventHubInput
                

For a more practical example, let's say your events are JSON objects with a temperature field. You could calculate a rolling average:


SELECT
    System.Timestamp AS WindowEnd,
    AVG(CAST(temperature AS FLOAT)) AS AverageTemperature
INTO
    outputBlob
FROM
    eventHubInput TIMESTAMP BY eventEnqueuedUtcTime
GROUP BY
    TumblingWindow(minute, 1)
                

Click "Save query".

Step 6: Start the Stream Analytics Job

Now that everything is configured, we can start the job.

  1. In your Stream Analytics job, click "Start" on the overview page.
  2. Choose "Now" for "Job start time".
  3. Click "Start".

Step 7: Send Data to Event Hubs

To see your pipeline in action, you need to send data to your Event Hub.

Using a simple Python script:

Install the necessary library: pip install azure-eventhub


import os
import json
import time
from azure.eventhub import EventHubProducer, EventData

# Replace with your Event Hub connection string
# You can get this from your Event Hub's "Shared access policies" in the Azure Portal
# Make sure to use the "Send" key
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=your-key-name;SharedAccessKey=your-key-value"
EVENT_HUB_NAME = "realtime-data" # Your Event Hub name

def send_event(producer, data):
    event_data = EventData(json.dumps(data))
    producer.send(event_data)
    print(f"Sent: {data}")

if __name__ == "__main__":
    producer = EventHubProducer.from_connection_string(EVENT_HUB_CONNECTION_STR, event_hub_name=EVENT_HUB_NAME)

    try:
        for i in range(20):
            sample_data = {
                "deviceId": "sensor-001",
                "timestamp": int(time.time()),
                "temperature": 20.0 + (i % 5),
                "humidity": 50.0 + (i % 10)
            }
            send_event(producer, sample_data)
            time.sleep(1) # Send an event every second

    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        producer.close()
        print("Producer closed.")

                

Run this script from your terminal. Remember to replace the placeholder connection string with your actual Event Hubs send key connection string.

Monitoring and Verification

After sending some data, you can monitor your Stream Analytics job's metrics in the Azure Portal. Check your Blob Storage container for the output files generated by the job.

Conclusion

You have successfully set up a real-time data processing pipeline using Azure Event Hubs and Azure Stream Analytics. This basic example can be extended to handle more complex scenarios, such as anomaly detection, real-time dashboards, and data warehousing.

Explore More Azure Streaming Tutorials