Send and Receive Events with Azure Event Hubs

This tutorial guides you through sending and receiving events from an Azure Event Hub using a simple application. We'll cover the essential steps to get started with real-time data streaming.

Prerequisites

Before you begin, ensure you have the following:

Step 1: Create an Event Hubs Namespace and Event Hub

If you haven't already, create an Event Hubs namespace and an Event Hub. Follow the official Azure documentation for detailed instructions.

Once created, navigate to your Event Hub in the Azure portal and locate the Shared access policies. You'll need the Connection string—primary key.

Note: Keep your connection string secure. Do not share it publicly or commit it to source control.

Step 2: Send Events to the Event Hub

We'll demonstrate sending events using C# and Node.js. Choose the language that best suits your development environment.

2.1 Using C# (.NET Core)

Create a new .NET Core console application:

bash
dotnet new console -n EventHubsSender
cd EventHubsSender

Install the Azure Event Hubs SDK for .NET:

bash
dotnet add package Azure.Messaging.EventHubs

Replace the content of Program.cs with the following code:

csharp
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

namespace EventHubsSender
{
    class Program
    {
        // Replace with your actual connection string and event hub name
        private const string EventHubConnectionString = "";
        private const string EventHubName = "";

        static async Task Main(string[] args)
        {
            Console.WriteLine("Sending events to Event Hub...");

            // The producer client uses a connection string.
            await using var producer = new EventHubProducerClient(EventHubConnectionString, EventHubName);

            try
            {
                // Create a batch of events
                using EventDataBatch eventDataBatch = await producer.CreateBatchAsync();

                for (int i = 1; i <= 5; i++)
                {
                    var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i} - {DateTime.UtcNow}"));
                    if (!eventDataBatch.TryAddMessage(eventData))
                    {
                        // If the batch is full, send it and create a new one
                        throw new Exception($"The batch is full and cannot add more messages. Message {i} was not added.");
                    }
                }

                // Send the batch of events
                await producer.SendAsync(eventDataBatch);
                Console.WriteLine("Batch sent successfully.");
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error sending events: {ex.Message}");
            }

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
    }
}

Replace <YOUR_EVENT_HUB_CONNECTION_STRING> and <YOUR_EVENT_HUB_NAME> with your actual values. Then, run the application:

bash
dotnet run

2.2 Using Node.js

Create a new directory for your project and navigate into it:

bash
mkdir event-hubs-sender-node
cd event-hubs-sender-node

Initialize a Node.js project and install the Azure Event Hubs SDK for JavaScript:

bash
npm init -y
npm install @azure/event-hubs

Create a file named sendEvents.js and add the following code:

javascript
const { EventHubProducerClient, EventData } = require("@azure/event-hubs");

// Replace with your actual connection string and event hub name
const connectionString = "";
const eventHubName = "";

async function main() {
    console.log("Sending events to Event Hub...");
    const producer = new EventHubProducerClient(connectionString, eventHubName);

    try {
        const batch = await producer.createBatch();
        for (let i = 1; i <= 5; i++) {
            const eventBody = `Event ${i} - ${new Date().toISOString()}`;
            const eventData = new EventData(eventBody);
            if (!batch.tryAdd(eventData)) {
                throw new Error(`Batch is full. Event ${i} was not added.`);
            }
        }

        await producer.sendBatch(batch);
        console.log("Batch sent successfully.");
    } catch (err) {
        console.error("Error sending events:", err);
    } finally {
        await producer.close();
        console.log("Producer client closed.");
    }
}

main().catch((err) => {
    console.error("The receive loop encountered an error.", err);
});

Replace <YOUR_EVENT_HUB_CONNECTION_STRING> and <YOUR_EVENT_HUB_NAME> with your actual values. Then, run the script:

bash
node sendEvents.js

Step 3: Receive Events from the Event Hub

Now, let's set up a consumer to receive the events we just sent. Again, we'll provide examples for C# and Node.js.

3.1 Using C# (.NET Core)

Create another .NET Core console application:

bash
dotnet new console -n EventHubsReceiver
cd EventHubsReceiver

Install the Azure Event Hubs SDK for .NET:

bash
dotnet add package Azure.Messaging.EventHubs.Processor

Replace the content of Program.cs with the following code:

csharp
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;

namespace EventHubsReceiver
{
    class Program
    {
        // Replace with your actual connection string and event hub name
        private const string EventHubConnectionString = "";
        private const string EventHubName = "";

        // Replace with your Azure Storage connection string and container name
        private const string StorageConnectionString = "";
        private const string BlobContainerName = "eventhub-checkpoints";

        static async Task Main(string[] args)
        {
            Console.WriteLine("Starting Event Hubs processor...");

            // Use BlobCheckpointStore for durable checkpointing
            var storageClient = new BlobServiceClient(StorageConnectionString);
            var blobContainerClient = storageClient.GetBlobContainerClient(BlobContainerName);

            var processor = new EventProcessorClient(
                blobContainerClient,
                EventHubConsumerClient.DefaultConsumerGroupName, // or specify your consumer group name
                EventHubConnectionString,
                EventHubName);

            // Register handlers for processing events and errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;

            try
            {
                await processor.StartProcessingAsync();
                Console.WriteLine("Processor started. Press Ctrl+C to stop.");
                await Task.Delay(Timeout.Infinite); // Keep the console app running
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error starting processor: {ex.Message}");
            }
            finally
            {
                // If the application is shutting down, stop the processor
                await processor.StopProcessingAsync();
            }
        }

        static async Task ProcessEventHandler(ProcessEventArgs args)
        {
            Console.WriteLine($"\tReceived event: {Encoding.UTF8.GetString(args.Data.Body.ToArray())}");
            Console.WriteLine($"\tEnqueued Time: {args.Data.EnqueuedTime}");
            Console.WriteLine($"\tSequence Number: {args.Data.SequenceNumber}");

            // Update checkpoint after successful processing
            await args.UpdateCheckpointAsync();
        }

        static Task ProcessErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine($"Error in processor: {args.PartitionId} | {args.ErrorCode} | {args.Message}");
            return Task.CompletedTask;
        }
    }
}

Replace <YOUR_EVENT_HUB_CONNECTION_STRING>, <YOUR_EVENT_HUB_NAME>, <YOUR_STORAGE_CONNECTION_STRING>, and <YOUR_STORAGE_CONTAINER_NAME>. You'll need an Azure Storage account for checkpointing.
Run the application:

bash
dotnet run

3.2 Using Node.js

Create a new directory for your receiver project:

bash
mkdir event-hubs-receiver-node
cd event-hubs-receiver-node

Initialize a Node.js project and install the Azure Event Hubs SDK for JavaScript:

bash
npm init -y
npm install @azure/event-hubs @azure/storage-blob

Create a file named receiveEvents.js and add the following code:

javascript
const { EventHubConsumerClient, latestEvents, earliestEvents } = require("@azure/event-hubs");
const { BlobServiceClient } = require("@azure/storage-blob");

// Replace with your actual connection string and event hub name
const connectionString = "";
const eventHubName = "";
const consumerGroup = EventHubConsumerClient.defaultConsumerGroupName; // or specify your consumer group

// Replace with your Azure Storage connection string and container name
const storageConnectionString = "";
const blobContainerName = "eventhub-checkpoints";

async function main() {
    console.log("Starting Event Hubs consumer...");

    const storageClient = BlobServiceClient.fromConnectionString(storageConnectionString);
    const blobContainerClient = storageClient.getContainerClient(blobContainerName);

    const consumerClient = new EventHubConsumerClient(
        consumerGroup,
        connectionString,
        eventHubName,
        {
            blobCheckpointStore: blobContainerClient
        }
    );

    const subscription = consumerClient.subscribe(
        {
            async processEvents(events, context) {
                console.log(`Received ${events.length} events.`);
                for (const event of events) {
                    console.log(`\tMessage: ${event.body}`);
                    console.log(`\tEnqueued Time: ${event.enqueuedTime}`);
                    console.log(`\tSequence Number: ${event.sequenceNumber}`);
                }
                // Update checkpoint after processing events
                await context.updateCheckpoint(events[events.length - 1]);
            },
            async processError(err, context) {
                console.error("Error occurred:", err);
            }
        },
        {
            startPosition: earliestEvents // You can also use latestEvents or specify a time
        }
    );

    console.log("Consumer subscribed. Press Ctrl+C to stop.");

    // Keep the script running until interrupted
    process.on('SIGINT', async () => {
        console.log("Stopping consumer...");
        await subscription.close();
        await consumerClient.close();
        console.log("Consumer and client closed.");
        process.exit();
    });
}

main().catch((err) => {
    console.error("The receive loop encountered an error.", err);
    process.exit(1);
});

Replace <YOUR_EVENT_HUB_CONNECTION_STRING>, <YOUR_EVENT_HUB_NAME>, <YOUR_STORAGE_CONNECTION_STRING>, and <YOUR_STORAGE_CONTAINER_NAME>. You'll need an Azure Storage account for checkpointing.
Run the script:

bash
node receiveEvents.js

Step 4: Verify Event Flow

To verify that events are flowing correctly:

  1. Start the receiver application (C# or Node.js).
  2. Run the sender application (C# or Node.js) in a separate terminal or process.
  3. You should see the events being received and logged by the receiver application.
Tip: Ensure your sender and receiver applications are using the same Event Hub name and consumer group. If you're starting for the first time, the receiver will start from the earliest available event.

Conclusion

Congratulations! You've successfully set up an application to send and receive events from Azure Event Hubs. This fundamental pattern is the basis for many real-time data processing scenarios.

Explore the Azure Event Hubs documentation for more advanced features like scaling, partitioning, and integration with other Azure services like Azure Functions and Azure Stream Analytics.