Processing Azure Event Hubs with Azure Functions

This tutorial demonstrates how to create an Azure Function that triggers on new messages arriving in an Azure Event Hub. You will learn how to connect Azure Functions to Event Hubs, process incoming events, and handle potential errors.

Prerequisites

Step 1: Create an Azure Function Project

First, let's set up a local Azure Functions project. We'll use the Azure Functions Core Tools for this.


func init EventHubProcessor --worker-runtime dotnet
cd EventHubProcessor
            

This command initializes a new C# Azure Functions project named EventHubProcessor.

Step 2: Add the Event Hubs Trigger

Next, we'll add a new function that uses the Event Hubs trigger. This function will be executed whenever new events are published to your Event Hub.


func new --name ProcessEventHubMessages --template "Azure Event Hubs trigger"
            

When prompted, enter the name for your function (ProcessEventHubMessages) and follow the prompts for the Event Hub name and connection settings.

Step 3: Configure the Event Hubs Connection

You need to configure the connection string to your Event Hubs namespace. Open the local.settings.json file in your project root and add or update the connection string. Replace YOUR_EVENT_HUB_CONNECTION_STRING with your actual connection string.


{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "EventHubConnection": "Endpoint=sb://YOUR_NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY"
  }
}
            

It's highly recommended to use Azure Key Vault for storing secrets in production environments instead of hardcoding them in local.settings.json.

Step 4: Implement the Function Logic

Open the C# file for your function (e.g., ProcessEventHubMessages.cs). The generated code will look something like this:

ProcessEventHubMessages.cs


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;

namespace EventHubProcessor
{
    public static class ProcessEventHubMessages
    {
        [FunctionName("ProcessEventHubMessages")]
        public static async Task Run(
            [EventHubTrigger("my-event-hub", Connection = "EventHubConnection")] EventData[] events,
            ILogger log)
        {
            log.LogInformation($"C# Event Hub trigger function processed a batch of {events.Length} events.");

            var exceptions = new List<Exception>();

            foreach (EventData eventData in events)
            {
                try
                {
                    string messageBody = Encoding.UTF8.GetString(eventData.EventBody.ToArray());
                    log.LogInformation($"Event Body: {messageBody}");
                    log.LogInformation($"Enqueued Time UTC: {eventData.EnqueuedTime.ToString()}");
                    log.LogInformation($"Sequence Number: {eventData.SequenceNumber}");
                    log.LogInformation($"Offset: {eventData.Offset}");

                    // TODO: Add your custom processing logic here.
                    // For example, you could save the data to a database,
                    // send it to another service, or perform analysis.

                }
                catch (Exception e)
                {
                    // We need to keep processing the rest of the batch
                    // so we capture this exception and continue.
                    exceptions.Add(e);
                }
            }

            // Once all events are processed, check for exceptions.
            if (exceptions.Count > 1)
            {
                throw new AggregateException(exceptions);
            }

            if (exceptions.Count == 1)
            {
                throw exceptions.Single();
            }
        }
    }
}
                    

In this code:

Step 5: Run the Function Locally

You can now run your Azure Function project locally to test the Event Hub integration. Navigate to your project directory in the terminal and run:


func start
            

Your function host will start. Once it's running, send some messages to your configured Event Hub. You should see the log output in your terminal, indicating that your function has been triggered and processed the events.

Testing with a Helper Script

You can use a simple script (e.g., Python) to send test messages to your Event Hub. Ensure your script uses the same connection string and Event Hub name.


# Example Python script using azure-eventhub
from azure.eventhub import EventHubProducer
import os

CONNECTION_STR = os.environ["EventHubConnection"] # Ensure this env var is set
EVENT_HUB_NAME = "my-event-hub" # Replace with your EH name

producer = EventHubProducer.from_connection_string(CONNECTION_STR, event_hub_name=EVENT_HUB_NAME)

def send_events(num_events):
    for i in range(num_events):
        event_data = f"Test message {i}"
        producer.send(event_data)
        print(f"Sent: {event_data}")

send_events(5)
producer.close()
                

Step 6: Deploy to Azure

Once you're satisfied with your local testing, you can deploy your Azure Function to the cloud. Make sure you have created an Azure Function App in your Azure subscription.

In your project directory, run the following command:


func azure functionapp publish YOUR_FUNCTION_APP_NAME
            

Replace YOUR_FUNCTION_APP_NAME with the name of your deployed Azure Function App.

Step 7: Configure Application Settings in Azure

After deployment, you need to configure the application settings for your Function App in the Azure portal. Navigate to your Function App, then go to Configuration under Settings.

Add a new application setting:

Also, ensure your function is configured to use the correct Event Hub name. You can often set this in the Function App's application settings or directly in the function definition if you re-deploy. For a deployment.json file, you might have something like:


{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "YOUR_AZURE_STORAGE_CONNECTION_STRING",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "EventHubConnection": "%EventHubConnection%"
    },
    "Host": {
        "LocalHttpPort": 7071,
        "CdnPrefix": "http://localhost:7071"
    },
    "Extensions": {
        "EventHubs": {
            "Connection": "EventHubConnection",
            "ConsumerGroup": "$Default",
            "EventHubName": "my-event-hub"
        }
    }
}
            

Note that in Azure, the connection string value is typically referenced using `%SettingName%` syntax within appsettings.json or directly as the value in the Azure portal's application settings.

Advanced Scenarios

Batch Processing and Throughput

Azure Functions processes Event Hub messages in batches. The size of these batches can be configured to optimize throughput and resource usage. You can adjust settings like MaxBatchSize and PrefetchCount in your function's configuration.

Error Handling and Dead-Lettering

For robust processing, consider implementing a dead-lettering strategy. If an event cannot be processed after several retries, you can send it to a separate "dead-letter" queue or topic for later inspection. This prevents problematic events from blocking the processing of valid ones.

Monitoring and Logging

Leverage Azure Application Insights for comprehensive monitoring of your Azure Functions. You can track function executions, view logs, analyze performance metrics, and set up alerts for errors.