This tutorial guides you through sending and receiving events from an Azure Event Hub using a simple application. We'll cover the essential steps to get started with real-time data streaming.
Before you begin, ensure you have the following:
If you haven't already, create an Event Hubs namespace and an Event Hub. Follow the official Azure documentation for detailed instructions.
Once created, navigate to your Event Hub in the Azure portal and locate the Shared access policies. You'll need the Connection string—primary key.
We'll demonstrate sending events using C# and Node.js. Choose the language that best suits your development environment.
Create a new .NET Core console application:
bash
dotnet new console -n EventHubsSender
cd EventHubsSender
Install the Azure Event Hubs SDK for .NET:
bash
dotnet add package Azure.Messaging.EventHubs
Replace the content of Program.cs with the following code:
csharp
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace EventHubsSender
{
class Program
{
// Replace with your actual connection string and event hub name
private const string EventHubConnectionString = "";
private const string EventHubName = "";
static async Task Main(string[] args)
{
Console.WriteLine("Sending events to Event Hub...");
// The producer client uses a connection string.
await using var producer = new EventHubProducerClient(EventHubConnectionString, EventHubName);
try
{
// Create a batch of events
using EventDataBatch eventDataBatch = await producer.CreateBatchAsync();
for (int i = 1; i <= 5; i++)
{
var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i} - {DateTime.UtcNow}"));
if (!eventDataBatch.TryAddMessage(eventData))
{
// If the batch is full, send it and create a new one
throw new Exception($"The batch is full and cannot add more messages. Message {i} was not added.");
}
}
// Send the batch of events
await producer.SendAsync(eventDataBatch);
Console.WriteLine("Batch sent successfully.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending events: {ex.Message}");
}
Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}
}
Replace <YOUR_EVENT_HUB_CONNECTION_STRING> and <YOUR_EVENT_HUB_NAME> with your actual values. Then, run the application:
bash
dotnet run
Create a new directory for your project and navigate into it:
bash
mkdir event-hubs-sender-node
cd event-hubs-sender-node
Initialize a Node.js project and install the Azure Event Hubs SDK for JavaScript:
bash
npm init -y
npm install @azure/event-hubs
Create a file named sendEvents.js and add the following code:
javascript
const { EventHubProducerClient, EventData } = require("@azure/event-hubs");
// Replace with your actual connection string and event hub name
const connectionString = "";
const eventHubName = "";
async function main() {
console.log("Sending events to Event Hub...");
const producer = new EventHubProducerClient(connectionString, eventHubName);
try {
const batch = await producer.createBatch();
for (let i = 1; i <= 5; i++) {
const eventBody = `Event ${i} - ${new Date().toISOString()}`;
const eventData = new EventData(eventBody);
if (!batch.tryAdd(eventData)) {
throw new Error(`Batch is full. Event ${i} was not added.`);
}
}
await producer.sendBatch(batch);
console.log("Batch sent successfully.");
} catch (err) {
console.error("Error sending events:", err);
} finally {
await producer.close();
console.log("Producer client closed.");
}
}
main().catch((err) => {
console.error("The receive loop encountered an error.", err);
});
Replace <YOUR_EVENT_HUB_CONNECTION_STRING> and <YOUR_EVENT_HUB_NAME> with your actual values. Then, run the script:
bash
node sendEvents.js
Now, let's set up a consumer to receive the events we just sent. Again, we'll provide examples for C# and Node.js.
Create another .NET Core console application:
bash
dotnet new console -n EventHubsReceiver
cd EventHubsReceiver
Install the Azure Event Hubs SDK for .NET:
bash
dotnet add package Azure.Messaging.EventHubs.Processor
Replace the content of Program.cs with the following code:
csharp
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
namespace EventHubsReceiver
{
class Program
{
// Replace with your actual connection string and event hub name
private const string EventHubConnectionString = "";
private const string EventHubName = "";
// Replace with your Azure Storage connection string and container name
private const string StorageConnectionString = "";
private const string BlobContainerName = "eventhub-checkpoints";
static async Task Main(string[] args)
{
Console.WriteLine("Starting Event Hubs processor...");
// Use BlobCheckpointStore for durable checkpointing
var storageClient = new BlobServiceClient(StorageConnectionString);
var blobContainerClient = storageClient.GetBlobContainerClient(BlobContainerName);
var processor = new EventProcessorClient(
blobContainerClient,
EventHubConsumerClient.DefaultConsumerGroupName, // or specify your consumer group name
EventHubConnectionString,
EventHubName);
// Register handlers for processing events and errors
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;
try
{
await processor.StartProcessingAsync();
Console.WriteLine("Processor started. Press Ctrl+C to stop.");
await Task.Delay(Timeout.Infinite); // Keep the console app running
}
catch (Exception ex)
{
Console.WriteLine($"Error starting processor: {ex.Message}");
}
finally
{
// If the application is shutting down, stop the processor
await processor.StopProcessingAsync();
}
}
static async Task ProcessEventHandler(ProcessEventArgs args)
{
Console.WriteLine($"\tReceived event: {Encoding.UTF8.GetString(args.Data.Body.ToArray())}");
Console.WriteLine($"\tEnqueued Time: {args.Data.EnqueuedTime}");
Console.WriteLine($"\tSequence Number: {args.Data.SequenceNumber}");
// Update checkpoint after successful processing
await args.UpdateCheckpointAsync();
}
static Task ProcessErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine($"Error in processor: {args.PartitionId} | {args.ErrorCode} | {args.Message}");
return Task.CompletedTask;
}
}
}
Replace <YOUR_EVENT_HUB_CONNECTION_STRING>, <YOUR_EVENT_HUB_NAME>, <YOUR_STORAGE_CONNECTION_STRING>, and <YOUR_STORAGE_CONTAINER_NAME>. You'll need an Azure Storage account for checkpointing.
Run the application:
bash
dotnet run
Create a new directory for your receiver project:
bash
mkdir event-hubs-receiver-node
cd event-hubs-receiver-node
Initialize a Node.js project and install the Azure Event Hubs SDK for JavaScript:
bash
npm init -y
npm install @azure/event-hubs @azure/storage-blob
Create a file named receiveEvents.js and add the following code:
javascript
const { EventHubConsumerClient, latestEvents, earliestEvents } = require("@azure/event-hubs");
const { BlobServiceClient } = require("@azure/storage-blob");
// Replace with your actual connection string and event hub name
const connectionString = "";
const eventHubName = "";
const consumerGroup = EventHubConsumerClient.defaultConsumerGroupName; // or specify your consumer group
// Replace with your Azure Storage connection string and container name
const storageConnectionString = "";
const blobContainerName = "eventhub-checkpoints";
async function main() {
console.log("Starting Event Hubs consumer...");
const storageClient = BlobServiceClient.fromConnectionString(storageConnectionString);
const blobContainerClient = storageClient.getContainerClient(blobContainerName);
const consumerClient = new EventHubConsumerClient(
consumerGroup,
connectionString,
eventHubName,
{
blobCheckpointStore: blobContainerClient
}
);
const subscription = consumerClient.subscribe(
{
async processEvents(events, context) {
console.log(`Received ${events.length} events.`);
for (const event of events) {
console.log(`\tMessage: ${event.body}`);
console.log(`\tEnqueued Time: ${event.enqueuedTime}`);
console.log(`\tSequence Number: ${event.sequenceNumber}`);
}
// Update checkpoint after processing events
await context.updateCheckpoint(events[events.length - 1]);
},
async processError(err, context) {
console.error("Error occurred:", err);
}
},
{
startPosition: earliestEvents // You can also use latestEvents or specify a time
}
);
console.log("Consumer subscribed. Press Ctrl+C to stop.");
// Keep the script running until interrupted
process.on('SIGINT', async () => {
console.log("Stopping consumer...");
await subscription.close();
await consumerClient.close();
console.log("Consumer and client closed.");
process.exit();
});
}
main().catch((err) => {
console.error("The receive loop encountered an error.", err);
process.exit(1);
});
Replace <YOUR_EVENT_HUB_CONNECTION_STRING>, <YOUR_EVENT_HUB_NAME>, <YOUR_STORAGE_CONNECTION_STRING>, and <YOUR_STORAGE_CONTAINER_NAME>. You'll need an Azure Storage account for checkpointing.
Run the script:
bash
node receiveEvents.js
To verify that events are flowing correctly:
Congratulations! You've successfully set up an application to send and receive events from Azure Event Hubs. This fundamental pattern is the basis for many real-time data processing scenarios.
Explore the Azure Event Hubs documentation for more advanced features like scaling, partitioning, and integration with other Azure services like Azure Functions and Azure Stream Analytics.