Introduction
Leveraging machine learning models directly within Azure Stream Analytics (ASA) unlocks powerful real-time data processing capabilities. This integration allows you to enrich streaming data with predictions, anomalies, classifications, and more, directly as events flow through your ASA job. This document outlines the various ways to integrate ASA with Azure Machine Learning (AML), the setup required, and provides practical examples.
By combining the low-latency, scalable nature of Stream Analytics with the predictive power of Machine Learning, you can build intelligent applications that react instantly to evolving data patterns.
Key Benefits
- Real-time Insights: Make predictions and decisions on data as it arrives, without batch processing delays.
- Scalability: Inherit the scalability of both Azure Stream Analytics and Azure Machine Learning services.
- Enriched Data: Augment raw streaming data with ML-driven features like sentiment, fraud scores, or object detection.
- Reduced Latency: Perform complex ML inference directly in the streaming pipeline, minimizing end-to-end latency.
- Simplified Architecture: Integrate ML inference directly into your existing ASA pipelines, simplifying deployment and management.
Integration Patterns
There are two primary patterns for integrating Azure Stream Analytics with Azure Machine Learning:
Pattern 1: ASA Calls AML Endpoint
In this pattern, your Azure Stream Analytics job invokes an Azure Machine Learning web service endpoint. This is typically used when you have a pre-trained AML model deployed as a real-time endpoint.
Workflow:
- ASA job receives incoming data stream.
- For each event (or a batch of events), ASA makes an HTTP request to the AML web service endpoint, sending the relevant data as input.
- The AML endpoint processes the request, runs inference using the deployed model, and returns the prediction or result.
- ASA receives the response and can then use this ML output to enrich the data, route it, or trigger further actions.
Pattern 2: AML Scores Data in ASA (using custom code)
This pattern involves leveraging Azure Functions or Azure ML Compute Instances within your ASA job to execute ML code. While less common for simple real-time endpoints, this offers greater flexibility for complex scenarios, custom model loading, or pre/post-processing logic that can't be encapsulated in a simple AML endpoint.
Workflow:
- ASA job receives incoming data stream.
- ASA can trigger an Azure Function (or other custom compute) with the data.
- The Azure Function executes custom Python or other code that loads an AML model (e.g., from Azure Blob Storage or ML Model Registry) and performs inference.
- The Azure Function returns the ML output back to ASA.
Note: Pattern 1 (ASA calling AML Endpoint) is the most straightforward and commonly used method for integrating pre-trained models. We will focus on this pattern for the setup and query examples.
Setup and Configuration
Setting up the integration involves configuring both your Azure Machine Learning service and your Azure Stream Analytics job.
Azure ML Setup
- Train and Register Model: Train your machine learning model using Azure Machine Learning or any other framework. Register the trained model in your AML workspace.
- Deploy Model: Deploy your registered model as a real-time inference endpoint. This can be deployed to Azure Kubernetes Service (AKS) or Azure Container Instances (ACI). For development and testing, ACI is often quicker to set up.
- Obtain Endpoint Details: Once deployed, note down theREST Endpoint URL and thePrimary Key (or secondary key) for authentication. You will need these for your ASA job.
Azure Stream Analytics Setup
- Create ASA Job: Create an Azure Stream Analytics job in the Azure portal.
- Configure Input: Set up your data input source (e.g., Event Hubs, IoT Hub, Blob Storage).
- Add Reference Input:
- In your ASA job, navigate to 'Job topology' -> 'Inputs'.
- Click 'Add reference input'.
- Input Name: Give it a descriptive name (e.g.,
aml_endpoint). - Type: Select 'Reference input'. Reference inputs are cached and loaded for use by the query, ideal for ML endpoints.
- Source: Choose 'Azure Machine Learning endpoint'.
- Authentication Mode: Select 'API Key'.
- Endpoint URL: Paste the REST Endpoint URL obtained from your AML deployment.
- API Key: Paste the Primary Key (or Secondary Key) for your AML endpoint.
- Serialization format: Usually JSON.
- Update frequency: Set how often ASA should check for updates to the cached ML endpoint configuration (e.g., every 30 minutes).
- Batch size: The number of events ASA sends in a single request to the ML endpoint. Experiment to find an optimal balance between latency and throughput.
- Configure Output: Set up your data output destination (e.g., Power BI, Azure SQL Database, Blob Storage, Event Hubs).
- Write Query: Develop your ASA query, calling the reference input to perform ML inference.
Query Examples
Here's a conceptual example of an Azure Stream Analytics query that integrates with an AML endpoint.
Assume your incoming stream is named iot_data and has fields like deviceId, temperature, and humidity.
Assume your AML reference input is named aml_enrichment and expects a JSON payload containing the features to score.
SELECT
iot_data.deviceId,
iot_data.temperature,
iot_data.humidity,
aml_result.prediction AS predicted_anomalous,
aml_result.score AS anomaly_score
INTO
output_stream
FROM
iot_data
JOIN
aml_enrichment INTO
(
deviceId STRING,
temperature FLOAT,
humidity FLOAT
) AS aml_input
ON
iot_data.deviceId = aml_input.deviceId
AND iot_data.temperature = aml_input.temperature
AND iot_data.humidity = aml_input.humidity
WHERE
DATEDIFF(minute, EventEnqueuedUtcTime, GETUTCDATE()) BETWEEN 0 AND 5
CROSS APPLY GetRecordProperties(aml_enrichment) AS aml_result
Explanation:
- We select relevant fields from the incoming
iot_datastream. - We perform a
JOINwith theaml_enrichmentreference input. TheINTOclause specifies the schema of the data sent to the AML endpoint. - The
ONclause maps fields from the input stream to the expected schema of the AML endpoint. CROSS APPLY GetRecordProperties(aml_enrichment) AS aml_result: This is crucial. Theaml_enrichmentreference input returns an array of results.GetRecordPropertiesflattens this array into a structure (aliased asaml_result) from which you can extract individual prediction fields (e.g.,aml_result.prediction).
Note: The exact structure of the JSON payload sent to AML and the response format will depend on your AML model's scoring script. You may need to adjust the INTO clause and how you extract results using GetRecordProperties accordingly. For multi-field inputs, you might need to construct a JSON object within the query if the reference input doesn't automatically handle it.
Best Practices
- Model Optimization: Ensure your AML model is optimized for inference speed and efficiency.
- Batching: Configure appropriate batch sizes for your ASA reference input to balance latency and throughput.
- Error Handling: Implement robust error handling in your ASA query to manage potential failures during ML endpoint calls. Use
TryEvaluatefor safer evaluations. - Monitoring: Monitor your ASA job's metrics (e.g., input/output events, SU utilization, data latency) and AML endpoint metrics for performance and potential issues.
- Security: Use API keys for authentication and consider Azure Key Vault for securely storing sensitive credentials.
- Schema Consistency: Ensure the schema of data sent to the AML endpoint matches what the model expects.
- Caching: Reference inputs are cached by ASA. Configure update frequencies appropriately to balance freshness of ML models and performance.
Key Considerations
- Cost: Running both ASA and AML services incurs costs. Optimize resource usage.
- Latency: While real-time, there's inherent latency from network calls between ASA and AML, and AML model inference time.
- Model Updates: Plan a strategy for updating your deployed AML model without interrupting the ASA job.
- Complexity: For very complex ML pipelines or those requiring custom environments, consider alternative architectures like Azure Databricks or Azure Functions triggered by ASA.
- State Management: If your ML model requires state (e.g., user history), managing this state across streaming events needs careful design, often involving external stores.