Azure Event Hubs Producer Guide

This guide provides comprehensive instructions on how to send (produce) events to Azure Event Hubs. Event Hubs is a highly scalable data streaming platform and event ingestion service. It can be used to stream millions of events per second from numerous sources to process and analyze in real time or store for later batch analysis.

Prerequisites

Connecting to Event Hubs

To send events, your producer application needs to establish a connection to the Event Hub. This is typically done using a connection string that contains endpoint information and authentication credentials.

Using Connection Strings

You can find your Event Hub connection string in the Azure portal under your Event Hub namespace's "Shared access policies". It's recommended to use a policy with Send permissions.

Important: Never hardcode connection strings directly into your application code. Use environment variables, Azure Key Vault, or configuration files for secure management.

Sending Events

The process of sending events varies slightly depending on the SDK you use. Here's a general outline and examples for common SDKs.

Core Concepts

Sending with .NET SDK

Install the latest Azure.Messaging.EventHubs NuGet package.

// Example using Azure.Messaging.EventHubs
            using Azure.Messaging.EventHubs;
            using System;
            using System.Text;
            using System.Threading.Tasks;

            public class EventHubProducer
            {
                private const string connectionString = "";
                private const string eventHubName = "";

                public static async Task SendEventsAsync(int numberOfEvents)
                {
                    await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);

                    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

                    for (int i = 1; i <= numberOfEvents; i++)
                    {
                        var eventBody = $"Event #{i}";
                        if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody))))
                        {
                            // Batch is full, send it and create a new one
                            Console.WriteLine($"Batch full. Sending {eventBatch.Count} events.");
                            await producerClient.SendAsync(eventBatch);
                            // Create a new batch
                            eventBatch = await producerClient.CreateBatchAsync();
                            Console.WriteLine($"Created a new batch. Adding event #{i}.");
                        }

                        // Add the event to the current batch
                        eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody)));
                    }

                    // Send the last batch if it's not empty
                    if (eventBatch.Count > 0)
                    {
                        Console.WriteLine($"Sending the last batch with {eventBatch.Count} events.");
                        await producerClient.SendAsync(eventBatch);
                    }

                    Console.WriteLine($"Successfully sent {numberOfEvents} events.");
                }
            }
            

Sending with Python SDK

Install the azure-eventhub library: pip install azure-eventhub

# Example using azure-eventhub
            import os
            import asyncio
            from azure.eventhub.aio import EventHubProducerClient

            CONNECTION_STR = os.environ["EVENTHUB_CONNECTION_STR"]
            EVENTHUB_NAME = ""

            async def run():
                # On azure, the `EVENTHUB_CONNECTION_STR` format is
                # "Endpoint=;SharedAccessKeyName=;SharedAccessKey=;EntityPath="
                # You can also pass the entity path explicitly when creating the producer
                producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, eventhub_name=EVENTHUB_NAME)

                async with producer:
                    event_data_batch = await producer.create_batch()
                    for i in range(10):
                        event_body = f"Event number {i}"
                        try:
                            # Try to add the event to the batch
                            event_data_batch.add(event_body)
                        except ValueError:
                            # Batch is full, send it and create a new one
                            await producer.send_batch(event_data_batch)
                            event_data_batch = await producer.create_batch()
                            # Add the current event to the new batch
                            event_data_batch.add(event_body)

                    # Send the last batch if it's not empty
                    if len(event_data_batch) > 0:
                        await producer.send_batch(event_data_batch)

            async def main():
                await run()

            if __name__ == "__main__":
                # Ensure EVENTHUB_CONNECTION_STR is set in your environment
                # Example: export EVENTHUB_CONNECTION_STR="Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY;EntityPath=your-eventhub"
                asyncio.run(main())
            

Best Practices for Producers

Note: Event Hubs offers different tiers (Basic, Standard) and capacity units (Throughput Units - TUs, or Processing Units - PUs for Kafka protocol). Ensure your Event Hub is configured with sufficient capacity for your expected load.

Advanced Scenarios

Sending with Custom Properties

You can attach custom metadata to your events. This is useful for adding contextual information that consumers can leverage.

.NET SDK - Custom Properties

var eventData = new EventData(Encoding.UTF8.GetBytes("Event with properties"));
eventData.Properties.Add("sourceApp", "MyProducerApp");
eventData.Properties.Add("timestampUTC", DateTime.UtcNow.ToString("o"));
// Then add eventData to a batch and send

Python SDK - Custom Properties

event_data_batch.add(event_body, properties={"sourceApp": "MyProducerApp"})
# Or directly to EventData object if not using batch.add(event_body) directly
# event = EventData(event_body, properties={"sourceApp": "MyProducerApp"})

Using Partition Keys

When creating an EventData object or adding to a batch, you can specify a partition key.

.NET SDK - Partition Key

var eventData = new EventData(Encoding.UTF8.GetBytes("Event for user X"));
eventData.PartitionKey = "userX"; // All events with "userX" key go to the same partition
// Add eventData to batch and send

Python SDK - Partition Key

event_data_batch.add(event_body, partition_key="userX")