Azure Event Hubs

Documentation | Sending and Receiving Events

Sending and Receiving Events

Azure Event Hubs is a highly scalable data streaming platform and event ingestion service. This section covers the fundamental operations of sending events to an Event Hub and receiving them using consumer groups.

Sending Events

Events are sent to an Event Hub as a batch. You can use various SDKs available for different programming languages. The following example demonstrates sending events using the Azure SDK for .NET.

Using the .NET SDK

First, ensure you have the necessary NuGet package installed:

dotnet add package Azure.Messaging.EventHubs

Then, you can use the following C# code snippet:


using Azure.Messaging.EventHubs;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

// Replace with your actual connection string and event hub name
string connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";

// Create a producer client that you can use to send events to an event hub
await using (var producer = new EventHubProducerClient(connectionString, eventHubName))
{
    try
    {
        // Create a batch of events
        using EventDataBatch eventDataBatch = await producer.CreateBatchAsync();

        var events = new List<EventData>
        {
            new EventData(Encoding.UTF8.GetBytes("{\"message\": \"Hello Event Hubs 1\"}")),
            new EventData(Encoding.UTF8.GetBytes("{\"message\": \"Hello Event Hubs 2\"}")),
            new EventData(Encoding.UTF8.GetBytes("{\"message\": \"Hello Event Hubs 3\"}")),
        };

        foreach (var eventData in events)
        {
            if (!eventDataBatch.TryAdd(eventData))
            {
                // If the batch is full, send it and create a new one
                throw new Exception($"The event is too large for the batch. 
                                     Event size: {eventData.EventBody.Length} bytes.");
            }
        }

        // Send the batch of events to the Event Hub
        await producer.SendAsync(eventDataBatch);
        Console.WriteLine($"Sent {events.Count} events.");
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error sending events: {ex.Message}");
    }
}
            

Receiving Events

Events are consumed from an Event Hub by clients that register with a specific consumer group. Each consumer group maintains its own read-only view of the event stream.

Using the .NET SDK for Receiving

You'll need the same `Azure.Messaging.EventHubs` package.


using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

// Replace with your actual connection string, event hub name, and consumer group name
string connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
string consumerGroupName = "$Default"; // Or your custom consumer group name

// Create a processor client that reads events from an event hub
await using (EventProcessorClient processor = new EventProcessorClient(
    new EventHubConsumerClient(EventHubClient.DefaultConsumerGroupName, connectionString, eventHubName),
    new MyEventHandler()))
{
    try
    {
        // Start processing events in the background
        await processor.StartProcessingAsync();

        Console.WriteLine("Starting event processing. Press Ctrl+C to stop.");
        // Keep the application running until interrupted
        await Task.Delay(Timeout.Infinite);
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Error starting processor: {ex.Message}");
    }
}

// Define a custom event handler to process incoming events
public class MyEventHandler : IEventProcessor
{
    public Task ProcessEventAsync(ProcessEventArgs args)
    {
        // Access the event data
        string eventBody = Encoding.UTF8.GetString(args.Data.EventBody.ToArray());
        Console.WriteLine($"Received event: {eventBody} | Partition: {args.PartitionId}");

        // Indicate that the event has been successfully processed
        return Task.CompletedTask;
    }

    public Task ProcessErrorAsync(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Error processing event: {args.Exception.Message}");
        return Task.CompletedTask;
    }

    public Task CloseAsync(CloseEventArgs args)
    {
        Console.WriteLine($"Processor closed. Reason: {args.Reason}");
        return Task.CompletedTask;
    }

    public Task Initialize(InitializationContext context)
    {
        Console.WriteLine($"Processor initialized for partition: {context.PartitionId}");
        return Task.CompletedTask;
    }
}
            

Key Concepts

  • Producers: Applications that send events to an Event Hub.
  • Consumers: Applications that read events from an Event Hub.
  • EventDataBatch: A container for sending multiple events as a single unit, optimizing throughput.
  • Consumer Group: A named view of an event stream. Each consumer group maintains its own offset, allowing multiple applications to consume events independently. The $Default consumer group is created automatically.
  • Partitioning: Events are partitioned within an Event Hub to enable parallel processing and scalability.

Further Considerations

When working with Event Hubs, consider the following: