Integrating Azure Event Hubs with Azure Stream Analytics

Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. Azure Stream Analytics (ASA) is a real-time analytics service that helps you analyze and process high volumes of streaming data from sources like Event Hubs.

This document guides you through the process of setting up an integration between Azure Event Hubs as an input source and Azure Stream Analytics for real-time data processing and analysis.

Key Benefits:
  • Ingest massive amounts of data in real-time.
  • Perform complex event processing (CEP) on streaming data.
  • Derive insights and trigger actions based on real-time events.
  • Seamless integration with other Azure services.

1. Setting up Azure Event Hubs

Before you can integrate with Stream Analytics, you need an Azure Event Hubs namespace and an Event Hub instance. If you haven't already, follow these steps:

  1. Create an Azure Event Hubs namespace in the Azure portal.
  2. Within the namespace, create an Event Hub instance.
  3. Note down the connection string for the Event Hub. You'll need this for the Stream Analytics input configuration.

Creating an Event Hub:

You can create an Event Hub using the Azure portal, Azure CLI, or PowerShell. Here's a simplified example using Azure CLI:


az eventhubs namespace create --name myEventHubNamespace --resource-group myResourceGroup --location eastus
az eventhubs create --name myEventHub --namespace-name myEventHubNamespace --resource-group myResourceGroup
            

2. Creating an Azure Stream Analytics Job

Next, create a Stream Analytics job that will consume data from your Event Hub.

  1. Navigate to the Azure portal and create a new Azure Stream Analytics job.
  2. Provide a name for the job, select your subscription, resource group, and location.
  3. Choose the appropriate hosting environment (Cloud or Edge).

3. Configuring Event Hubs as an Input Source

Once your Stream Analytics job is created, you need to configure Event Hubs as its input.

  1. In your Stream Analytics job, go to Job topology and select Inputs.
  2. Click Add input and choose Event Hub.
  3. Input alias: Give your input a meaningful name (e.g., eventhub_input).
  4. Event Hub namespace: Select the Event Hub namespace you created earlier.
  5. Event hub name: Select the specific Event Hub instance.
  6. Event Hub policy name: Choose the shared access policy (e.g., RootManageSharedAccessKey or a custom one with listen permissions).
  7. Authentication mode: Select Connection string and paste the connection string for your Event Hub. Alternatively, use Managed Identity for enhanced security.
  8. Consumer group: Specify a consumer group for the Event Hub. It's best practice to create a dedicated consumer group for your Stream Analytics job.
  9. Event serialization format: Select the format of the data in your Event Hub (e.g., JSON, CSV, Avro).
  10. Encoding: Specify the encoding of your data (e.g., UTF-8).
  11. Click Save.

Input Configuration Example (JSON):

When you save the input configuration, the Azure portal generates a JSON representation. Here's a conceptual snippet:


{
    "name": "eventhub_input",
    "properties": {
        "type": "StreamAnalyticsManagedPrivateEndpoint",
        "datasource": {
            "type": "Microsoft.ServiceBus/EventHub",
            "properties": {
                "serviceBusNamespace": "myEventHubNamespace",
                "eventHubName": "myEventHub",
                "sharedAccessPolicyName": "RootManageSharedAccessKey",
                "consumerGroupName": "ASA_ConsumerGroup",
                "authenticationMode": "ConnectionString",
                "connectionString": "Endpoint=sb://myEventHubNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
            }
        },
        "transformation": {
            "name": "TransformEvents",
            "properties": {
                "streamingUnits": 6,
                "watermarkSettings": {
                    "watermarkMode": "UseSystemTopology"
                }
            }
        },
        "inputSerialization": {
            "type": "Json",
            "properties": {
                "encoding": "UTF8"
            }
        }
    }
}
            

4. Writing a Stream Analytics Query

Now, define the logic to process the data from Event Hubs using Stream Analytics Query Language (SAQL), which is similar to SQL.

For example, to select all data and forward it to an output (e.g., another Event Hub, Blob Storage, SQL Database):


SELECT *
INTO outputAlias
FROM eventhub_input
            

To perform filtering and aggregation:


SELECT
    System.Timestamp AS WindowEnd,
    deviceId,
    COUNT(*) AS eventCount
INTO outputAlias
FROM eventhub_input TIMESTAMP BY eventTimestamp  -- Assuming 'eventTimestamp' is a column in your data
GROUP BY TumblingWindow(minute, 5), deviceId
HAVING COUNT(*) > 10
            

SAQL Features:

5. Configuring Outputs

Define where the processed data from your Stream Analytics job will be sent.

  1. In your Stream Analytics job, go to Job topology and select Outputs.
  2. Click Add output and choose your desired destination (e.g., Azure Blob Storage, Azure SQL Database, another Azure Event Hub, Power BI).
  3. Configure the output details, including connection information and format.

6. Starting the Stream Analytics Job

Once the input, query, and output are configured, start your Stream Analytics job.

You can start the job from the Azure portal. You can choose to start processing from a specific point in time or from the current time.

Monitoring: Regularly monitor your Stream Analytics job's performance, throughput, and error metrics through the Azure portal to ensure optimal operation.

Troubleshooting Common Issues

API References

Azure Event Hubs REST API

Reference for managing Event Hubs namespaces and entities.

Azure Event Hubs REST API Documentation

Azure Stream Analytics REST API

Reference for creating and managing Stream Analytics jobs, inputs, outputs, and queries.

Azure Stream Analytics REST API Documentation