This tutorial demonstrates how to create an Azure Function that triggers on new messages arriving in an Azure Event Hub. You will learn how to connect Azure Functions to Event Hubs, process incoming events, and handle potential errors.
First, let's set up a local Azure Functions project. We'll use the Azure Functions Core Tools for this.
func init EventHubProcessor --worker-runtime dotnet
cd EventHubProcessor
This command initializes a new C# Azure Functions project named EventHubProcessor.
Next, we'll add a new function that uses the Event Hubs trigger. This function will be executed whenever new events are published to your Event Hub.
func new --name ProcessEventHubMessages --template "Azure Event Hubs trigger"
When prompted, enter the name for your function (ProcessEventHubMessages) and follow the prompts for the Event Hub name and connection settings.
You need to configure the connection string to your Event Hubs namespace. Open the local.settings.json file in your project root and add or update the connection string. Replace YOUR_EVENT_HUB_CONNECTION_STRING with your actual connection string.
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"EventHubConnection": "Endpoint=sb://YOUR_NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY"
}
}
It's highly recommended to use Azure Key Vault for storing secrets in production environments instead of hardcoding them in local.settings.json.
Open the C# file for your function (e.g., ProcessEventHubMessages.cs). The generated code will look something like this:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace EventHubProcessor
{
public static class ProcessEventHubMessages
{
[FunctionName("ProcessEventHubMessages")]
public static async Task Run(
[EventHubTrigger("my-event-hub", Connection = "EventHubConnection")] EventData[] events,
ILogger log)
{
log.LogInformation($"C# Event Hub trigger function processed a batch of {events.Length} events.");
var exceptions = new List<Exception>();
foreach (EventData eventData in events)
{
try
{
string messageBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
log.LogInformation($"Event Body: {messageBody}");
log.LogInformation($"Enqueued Time UTC: {eventData.EnqueuedTime.ToString()}");
log.LogInformation($"Sequence Number: {eventData.SequenceNumber}");
log.LogInformation($"Offset: {eventData.Offset}");
// TODO: Add your custom processing logic here.
// For example, you could save the data to a database,
// send it to another service, or perform analysis.
}
catch (Exception e)
{
// We need to keep processing the rest of the batch
// so we capture this exception and continue.
exceptions.Add(e);
}
}
// Once all events are processed, check for exceptions.
if (exceptions.Count > 1)
{
throw new AggregateException(exceptions);
}
if (exceptions.Count == 1)
{
throw exceptions.Single();
}
}
}
}
In this code:
[EventHubTrigger(...)] attribute configures the function to be triggered by an Event Hub."my-event-hub" should be replaced with the actual name of your Event Hub.Connection = "EventHubConnection" refers to the name of the connection string setting in local.settings.json.EventData objects, allowing you to process events in batches.// TODO: Add your custom processing logic here. comment marks where you should insert your specific business logic.You can now run your Azure Function project locally to test the Event Hub integration. Navigate to your project directory in the terminal and run:
func start
Your function host will start. Once it's running, send some messages to your configured Event Hub. You should see the log output in your terminal, indicating that your function has been triggered and processed the events.
You can use a simple script (e.g., Python) to send test messages to your Event Hub. Ensure your script uses the same connection string and Event Hub name.
# Example Python script using azure-eventhub
from azure.eventhub import EventHubProducer
import os
CONNECTION_STR = os.environ["EventHubConnection"] # Ensure this env var is set
EVENT_HUB_NAME = "my-event-hub" # Replace with your EH name
producer = EventHubProducer.from_connection_string(CONNECTION_STR, event_hub_name=EVENT_HUB_NAME)
def send_events(num_events):
for i in range(num_events):
event_data = f"Test message {i}"
producer.send(event_data)
print(f"Sent: {event_data}")
send_events(5)
producer.close()
Once you're satisfied with your local testing, you can deploy your Azure Function to the cloud. Make sure you have created an Azure Function App in your Azure subscription.
In your project directory, run the following command:
func azure functionapp publish YOUR_FUNCTION_APP_NAME
Replace YOUR_FUNCTION_APP_NAME with the name of your deployed Azure Function App.
After deployment, you need to configure the application settings for your Function App in the Azure portal. Navigate to your Function App, then go to Configuration under Settings.
Add a new application setting:
EventHubConnectionAlso, ensure your function is configured to use the correct Event Hub name. You can often set this in the Function App's application settings or directly in the function definition if you re-deploy. For a deployment.json file, you might have something like:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "YOUR_AZURE_STORAGE_CONNECTION_STRING",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"EventHubConnection": "%EventHubConnection%"
},
"Host": {
"LocalHttpPort": 7071,
"CdnPrefix": "http://localhost:7071"
},
"Extensions": {
"EventHubs": {
"Connection": "EventHubConnection",
"ConsumerGroup": "$Default",
"EventHubName": "my-event-hub"
}
}
}
Note that in Azure, the connection string value is typically referenced using `%SettingName%` syntax within appsettings.json or directly as the value in the Azure portal's application settings.
Azure Functions processes Event Hub messages in batches. The size of these batches can be configured to optimize throughput and resource usage. You can adjust settings like MaxBatchSize and PrefetchCount in your function's configuration.
For robust processing, consider implementing a dead-lettering strategy. If an event cannot be processed after several retries, you can send it to a separate "dead-letter" queue or topic for later inspection. This prevents problematic events from blocking the processing of valid ones.
Leverage Azure Application Insights for comprehensive monitoring of your Azure Functions. You can track function executions, view logs, analyze performance metrics, and set up alerts for errors.