This tutorial guides you through setting up a basic publish-subscribe pattern using Azure Event Hubs. This pattern is fundamental for building decoupled, scalable, and real-time applications.
The publish-subscribe (pub/sub) messaging pattern is a communication paradigm where senders of messages, called publishers, do not broadcast messages to specific receivers, called subscribers. Instead, publishers categorize messages into classes without knowledge of which subscribers, if any, exist. Similarly, subscribers express interest in one or more classes of messages and receive only the messages that are of interest, without knowledge of any publishers.
If you haven't already, create an Event Hubs namespace and an Event Hub within it using the Azure portal, Azure CLI, or Azure PowerShell. Note down your connection string.
For detailed instructions, refer to: Create an Event Hubs namespace.
This application will send messages to the Event Hub.
Project Setup:
dotnet new console -n EventHubsPublisher
cd EventHubsPublisher
dotnet add package Azure.Messaging.EventHubs
Program.cs:
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
namespace EventHubsPublisher
{
class Program
{
// Replace with your actual connection string and event hub name
private const string EventHubsConnectionString = "Endpoint=sb://YOUR_NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY";
private const string EventHubName = "YOUR_EVENT_HUB_NAME";
static async Task Main(string[] args)
{
Console.WriteLine("Starting Event Hubs Publisher...");
await using var producerClient = new EventHubProducerClient(EventHubsConnectionString, EventHubName);
try
{
var eventDataBatch = await producerClient.CreateBatchAsync();
for (int i = 1; i <= 5; i++)
{
var message = $"{{ \"messageId\": {i}, \"content\": \"Hello from Publisher! - {DateTime.UtcNow}\" }}";
var eventBody = Encoding.UTF8.GetBytes(message);
if (!eventDataBatch.TryAdd(new EventData(eventBody)))
{
throw new Exception($"The batch has reached its size limit. Cannot add message {i}.");
}
Console.WriteLine($"Adding message {i}: {message}");
}
await producerClient.SendAsync(eventDataBatch);
Console.WriteLine("Batch of 5 messages sent successfully!");
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
Console.WriteLine("Publisher finished.");
}
}
}
}
Configuration: Replace YOUR_NAMESPACE, YOUR_KEY, and YOUR_EVENT_HUB_NAME with your actual Event Hubs credentials.
Run the Publisher:
dotnet run
This application will read messages from the Event Hub.
Project Setup:
dotnet new console -n EventHubsSubscriber
cd EventHubsSubscriber
dotnet add package Azure.Messaging.EventHubs
Program.cs:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
namespace EventHubsSubscriber
{
class Program
{
// Replace with your actual connection string and event hub name
private const string EventHubsConnectionString = "Endpoint=sb://YOUR_NAMESPACE.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY";
private const string EventHubName = "YOUR_EVENT_HUB_NAME";
private const string ConsumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or specify a custom consumer group
static async Task Main(string[] args)
{
Console.WriteLine("Starting Event Hubs Subscriber...");
await using var consumerClient = new EventHubConsumerClient(ConsumerGroup, EventHubsConnectionString, EventHubName);
var cancellationSource = new CancellationTokenSource();
Console.CancelKeyPress += (sender, eventArgs) =>
{
eventArgs.Cancel = true;
cancellationSource.Cancel();
Console.WriteLine("Cancellation requested. Shutting down subscriber...");
};
try
{
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync(cancellationSource.Token))
{
var message = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
Console.WriteLine($"Received message from partition {partitionEvent.Partition.PartitionId}: {message}");
}
}
catch (TaskCanceledException)
{
Console.WriteLine("Subscriber stopped gracefully.");
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
Console.WriteLine("Subscriber finished.");
}
}
}
}
Configuration: Replace YOUR_NAMESPACE, YOUR_KEY, and YOUR_EVENT_HUB_NAME with your actual Event Hubs credentials. You can also specify a custom consumer group if needed.
Run the Subscriber:
dotnet run
Important Note: For a production environment, consider using managed identities for authentication instead of connection strings, and implement robust error handling and retry mechanisms.
EventHubProducerClient to create batches of events and send them. TryAdd ensures that messages don't exceed the event size limits for a batch.EventHubConsumerClient to subscribe to a specific consumer group. ReadEventsAsync is an asynchronous iterator that yields events as they become available. The CancellationTokenSource allows for graceful shutdown.