Azure Event Hubs Producer Guide
This guide provides comprehensive instructions on how to send (produce) events to Azure Event Hubs. Event Hubs is a highly scalable data streaming platform and event ingestion service. It can be used to stream millions of events per second from numerous sources to process and analyze in real time or store for later batch analysis.
Prerequisites
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created within it.
- Appropriate permissions to send events to the Event Hub.
- A supported programming language SDK (e.g., .NET, Java, Python, Node.js) or the AMQP protocol.
Connecting to Event Hubs
To send events, your producer application needs to establish a connection to the Event Hub. This is typically done using a connection string that contains endpoint information and authentication credentials.
Using Connection Strings
You can find your Event Hub connection string in the Azure portal under your Event Hub namespace's "Shared access policies". It's recommended to use a policy with Send permissions.
Important: Never hardcode connection strings directly into your application code. Use environment variables, Azure Key Vault, or configuration files for secure management.
Sending Events
The process of sending events varies slightly depending on the SDK you use. Here's a general outline and examples for common SDKs.
Core Concepts
- Event: A record of something that has happened. It typically contains a body (data) and metadata.
- Partition: Event Hubs divides the throughput of an Event Hub into one or more partitions. Events sent to a partition are ordered within that partition.
- Partition Key: A key used to determine which partition an event is sent to. If a partition key is provided, events with the same partition key will always be sent to the same partition, ensuring order for that key. If no partition key is provided, Event Hubs distributes events across partitions.
Sending with .NET SDK
Install the latest Azure.Messaging.EventHubs NuGet package.
// Example using Azure.Messaging.EventHubs
using Azure.Messaging.EventHubs;
using System;
using System.Text;
using System.Threading.Tasks;
public class EventHubProducer
{
private const string connectionString = "";
private const string eventHubName = "";
public static async Task SendEventsAsync(int numberOfEvents)
{
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
for (int i = 1; i <= numberOfEvents; i++)
{
var eventBody = $"Event #{i}";
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody))))
{
// Batch is full, send it and create a new one
Console.WriteLine($"Batch full. Sending {eventBatch.Count} events.");
await producerClient.SendAsync(eventBatch);
// Create a new batch
eventBatch = await producerClient.CreateBatchAsync();
Console.WriteLine($"Created a new batch. Adding event #{i}.");
}
// Add the event to the current batch
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes(eventBody)));
}
// Send the last batch if it's not empty
if (eventBatch.Count > 0)
{
Console.WriteLine($"Sending the last batch with {eventBatch.Count} events.");
await producerClient.SendAsync(eventBatch);
}
Console.WriteLine($"Successfully sent {numberOfEvents} events.");
}
}
Sending with Python SDK
Install the azure-eventhub library: pip install azure-eventhub
# Example using azure-eventhub
import os
import asyncio
from azure.eventhub.aio import EventHubProducerClient
CONNECTION_STR = os.environ["EVENTHUB_CONNECTION_STR"]
EVENTHUB_NAME = ""
async def run():
# On azure, the `EVENTHUB_CONNECTION_STR` format is
# "Endpoint=;SharedAccessKeyName=;SharedAccessKey=;EntityPath="
# You can also pass the entity path explicitly when creating the producer
producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, eventhub_name=EVENTHUB_NAME)
async with producer:
event_data_batch = await producer.create_batch()
for i in range(10):
event_body = f"Event number {i}"
try:
# Try to add the event to the batch
event_data_batch.add(event_body)
except ValueError:
# Batch is full, send it and create a new one
await producer.send_batch(event_data_batch)
event_data_batch = await producer.create_batch()
# Add the current event to the new batch
event_data_batch.add(event_body)
# Send the last batch if it's not empty
if len(event_data_batch) > 0:
await producer.send_batch(event_data_batch)
async def main():
await run()
if __name__ == "__main__":
# Ensure EVENTHUB_CONNECTION_STR is set in your environment
# Example: export EVENTHUB_CONNECTION_STR="Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=YOUR_KEY;EntityPath=your-eventhub"
asyncio.run(main())
Best Practices for Producers
- Batching: Send events in batches to reduce network overhead and improve throughput. Most SDKs provide methods to create and manage event batches.
- Partitioning: Use partition keys strategically to ensure related events are processed in order. If ordering is not critical, allow Event Hubs to distribute events across partitions for better load balancing.
- Error Handling: Implement robust error handling, including retries for transient network issues or throttling.
- Throughput Management: Monitor your producer's throughput and scale your Event Hubs capacity (number of throughput units or processing units) as needed.
- Connection Management: Keep producer clients alive for extended periods rather than creating new ones for each event, as establishing connections has overhead.
Note: Event Hubs offers different tiers (Basic, Standard) and capacity units (Throughput Units - TUs, or Processing Units - PUs for Kafka protocol). Ensure your Event Hub is configured with sufficient capacity for your expected load.
Advanced Scenarios
Sending with Custom Properties
You can attach custom metadata to your events. This is useful for adding contextual information that consumers can leverage.
.NET SDK - Custom Properties
var eventData = new EventData(Encoding.UTF8.GetBytes("Event with properties"));
eventData.Properties.Add("sourceApp", "MyProducerApp");
eventData.Properties.Add("timestampUTC", DateTime.UtcNow.ToString("o"));
// Then add eventData to a batch and send
Python SDK - Custom Properties
event_data_batch.add(event_body, properties={"sourceApp": "MyProducerApp"})
# Or directly to EventData object if not using batch.add(event_body) directly
# event = EventData(event_body, properties={"sourceApp": "MyProducerApp"})
Using Partition Keys
When creating an EventData object or adding to a batch, you can specify a partition key.
.NET SDK - Partition Key
var eventData = new EventData(Encoding.UTF8.GetBytes("Event for user X"));
eventData.PartitionKey = "userX"; // All events with "userX" key go to the same partition
// Add eventData to batch and send
Python SDK - Partition Key
event_data_batch.add(event_body, partition_key="userX")