Integrating Azure Event Hubs with Azure Stream Analytics
Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. Azure Stream Analytics (ASA) is a real-time analytics service that helps you analyze and process high volumes of streaming data from sources like Event Hubs.
This document guides you through the process of setting up an integration between Azure Event Hubs as an input source and Azure Stream Analytics for real-time data processing and analysis.
- Ingest massive amounts of data in real-time.
- Perform complex event processing (CEP) on streaming data.
- Derive insights and trigger actions based on real-time events.
- Seamless integration with other Azure services.
1. Setting up Azure Event Hubs
Before you can integrate with Stream Analytics, you need an Azure Event Hubs namespace and an Event Hub instance. If you haven't already, follow these steps:
- Create an Azure Event Hubs namespace in the Azure portal.
- Within the namespace, create an Event Hub instance.
- Note down the connection string for the Event Hub. You'll need this for the Stream Analytics input configuration.
Creating an Event Hub:
You can create an Event Hub using the Azure portal, Azure CLI, or PowerShell. Here's a simplified example using Azure CLI:
az eventhubs namespace create --name myEventHubNamespace --resource-group myResourceGroup --location eastus
az eventhubs create --name myEventHub --namespace-name myEventHubNamespace --resource-group myResourceGroup
2. Creating an Azure Stream Analytics Job
Next, create a Stream Analytics job that will consume data from your Event Hub.
- Navigate to the Azure portal and create a new Azure Stream Analytics job.
- Provide a name for the job, select your subscription, resource group, and location.
- Choose the appropriate hosting environment (Cloud or Edge).
3. Configuring Event Hubs as an Input Source
Once your Stream Analytics job is created, you need to configure Event Hubs as its input.
- In your Stream Analytics job, go to Job topology and select Inputs.
- Click Add input and choose Event Hub.
- Input alias: Give your input a meaningful name (e.g.,
eventhub_input). - Event Hub namespace: Select the Event Hub namespace you created earlier.
- Event hub name: Select the specific Event Hub instance.
- Event Hub policy name: Choose the shared access policy (e.g.,
RootManageSharedAccessKeyor a custom one with listen permissions). - Authentication mode: Select Connection string and paste the connection string for your Event Hub. Alternatively, use Managed Identity for enhanced security.
- Consumer group: Specify a consumer group for the Event Hub. It's best practice to create a dedicated consumer group for your Stream Analytics job.
- Event serialization format: Select the format of the data in your Event Hub (e.g., JSON, CSV, Avro).
- Encoding: Specify the encoding of your data (e.g., UTF-8).
- Click Save.
Input Configuration Example (JSON):
When you save the input configuration, the Azure portal generates a JSON representation. Here's a conceptual snippet:
{
"name": "eventhub_input",
"properties": {
"type": "StreamAnalyticsManagedPrivateEndpoint",
"datasource": {
"type": "Microsoft.ServiceBus/EventHub",
"properties": {
"serviceBusNamespace": "myEventHubNamespace",
"eventHubName": "myEventHub",
"sharedAccessPolicyName": "RootManageSharedAccessKey",
"consumerGroupName": "ASA_ConsumerGroup",
"authenticationMode": "ConnectionString",
"connectionString": "Endpoint=sb://myEventHubNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
}
},
"transformation": {
"name": "TransformEvents",
"properties": {
"streamingUnits": 6,
"watermarkSettings": {
"watermarkMode": "UseSystemTopology"
}
}
},
"inputSerialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
}
}
4. Writing a Stream Analytics Query
Now, define the logic to process the data from Event Hubs using Stream Analytics Query Language (SAQL), which is similar to SQL.
For example, to select all data and forward it to an output (e.g., another Event Hub, Blob Storage, SQL Database):
SELECT *
INTO outputAlias
FROM eventhub_input
To perform filtering and aggregation:
SELECT
System.Timestamp AS WindowEnd,
deviceId,
COUNT(*) AS eventCount
INTO outputAlias
FROM eventhub_input TIMESTAMP BY eventTimestamp -- Assuming 'eventTimestamp' is a column in your data
GROUP BY TumblingWindow(minute, 5), deviceId
HAVING COUNT(*) > 10
SAQL Features:
- Windowing Functions: Tumbling, Hopping, Sliding, Session windows.
- Data Manipulation: SELECT, WHERE, GROUP BY, ORDER BY.
- Built-in Functions: Aggregations, String, Date, Math, and more.
- User-Defined Functions (UDFs): JavaScript and C# for custom logic.
5. Configuring Outputs
Define where the processed data from your Stream Analytics job will be sent.
- In your Stream Analytics job, go to Job topology and select Outputs.
- Click Add output and choose your desired destination (e.g., Azure Blob Storage, Azure SQL Database, another Azure Event Hub, Power BI).
- Configure the output details, including connection information and format.
6. Starting the Stream Analytics Job
Once the input, query, and output are configured, start your Stream Analytics job.
You can start the job from the Azure portal. You can choose to start processing from a specific point in time or from the current time.
Troubleshooting Common Issues
- Connection Errors: Verify Event Hub connection strings, policy names, and network connectivity. Ensure your ASA job's network allows access to Event Hubs.
- Serialization Errors: Ensure the
inputSerializationformat in ASA matches the actual format of data being sent to Event Hubs. - Query Errors: Use the Test query feature in ASA to validate your SAQL syntax and logic. Check timestamp configurations.
- Consumer Group Issues: Ensure a unique consumer group is used by your ASA job. If multiple consumers use the same group, they might compete for events.
API References
Azure Event Hubs REST API
Reference for managing Event Hubs namespaces and entities.
Azure Event Hubs REST API DocumentationAzure Stream Analytics REST API
Reference for creating and managing Stream Analytics jobs, inputs, outputs, and queries.
Azure Stream Analytics REST API Documentation