Guides & Tutorials
This guide will walk you through the fundamental process of sending events to and receiving events from Azure Event Hubs using the Azure SDKs.
Choose your preferred language and follow the setup instructions:
Install the necessary NuGet packages:
dotnet add package Azure.Messaging.EventHubs
Ensure you have your Event Hub connection string. It typically looks like:
Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=
Install the Event Hubs client library:
npm install @azure/event-hubs
Obtain your Event Hub connection string from the Azure portal.
Install the Event Hubs SDK:
pip install azure-eventhubs
You'll need your Event Hub connection string.
Add the Event Hubs dependency to your pom.xml (Maven) or build.gradle (Gradle):
<!-- Maven pom.xml -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.12.0</version> <!-- Check for the latest version -->
</dependency>
Get your Event Hub connection string.
Sending events involves creating an EventHubProducerClient (or equivalent) and sending batches of events.
Here's a basic example of sending a single event:
Use your Event Hub connection string and the Event Hub name.
using Azure.Messaging.EventHubs;
using System;
using System.Text;
using System.Threading.Tasks;
// Replace with your actual connection string and hub name
string connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
// Create an event
var eventBody = Encoding.UTF8.GetBytes("Hello, Event Hubs!");
var eventData = new EventData(eventBody);
// Send the event
try
{
await producerClient.SendAsync(new[] { eventData });
Console.WriteLine("Event sent successfully.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending event: {ex.Message}");
}
For efficiency, send events in batches.
var batchOptions = new CreateBatchOptions
{
PartitionKey = "myPartitionKey" // Optional: Ensures events go to the same partition
};
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(batchOptions);
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 1")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 2")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 3")));
if (!eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("This event is too large and will fail"))))
{
Console.WriteLine("The event is too large to fit in the batch.");
}
try
{
await producerClient.SendAsync(eventBatch);
Console.WriteLine("Event batch sent.");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending event batch: {ex.Message}");
}
Example using JavaScript:
const { EventHubProducerClient } = require("@azure/event-hubs");
// Replace with your actual connection string and hub name
const connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const producerClient = new EventHubProducerClient(connectionString, eventHubName);
async function sendEvents() {
const eventData = [{ body: "Hello from Node.js!" }];
try {
await producerClient.sendBatch(eventData);
console.log("Event sent successfully.");
} catch (err) {
console.error("Error sending event:", err);
} finally {
await producerClient.close();
}
}
sendEvents();
createBatch() to build and send batches programmatically, similar to the .NET example.
Python example:
from azure.eventhub import EventHubProducer, EventPosition, EventHubClient
import os
# Replace with your actual connection string and hub name
connection_str = "YOUR_EVENTHUB_CONNECTION_STRING"
event_hub_name = "YOUR_EVENTHUB_NAME"
producer = EventHubProducer(connection_str, event_hub_name)
event_data = "Hello, Event Hubs from Python!"
try:
producer.send(event_data)
print("Event sent successfully.")
except Exception as e:
print(f"Error sending event: {e}")
finally:
producer.close()
Java example:
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventData;
import java.util.Arrays;
import java.util.List;
// Replace with your actual connection string and hub name
String connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
String eventHubName = "YOUR_EVENTHUB_NAME";
EventHubProducerClient producerClient = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
// Create an event
String message = "Hello, Event Hubs from Java!";
EventData eventData = new EventData(message.getBytes());
// Send the event
try {
producerClient.send(Arrays.asList(eventData));
System.out.println("Event sent successfully.");
} catch (Exception ex) {
System.err.println("Error sending event: " + ex.getMessage());
} finally {
producerClient.close();
}
Receiving events typically involves creating an EventHubConsumerClient (or equivalent) and registering an event handler.
Here's how to start receiving events:
You'll need the connection string, hub name, and a consumer group name. The default consumer group is $Default.
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Text;
using System.Threading.Tasks;
// Replace with your actual connection string, hub name, and consumer group
string connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
string eventHubName = "YOUR_EVENTHUB_NAME";
string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Or "$Default"
var consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine("Starting to listen for events...");
// Register an event handler
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
if (partitionEvent.Data != null)
{
string messageBody = Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray());
Console.WriteLine($"Received event: {messageBody} from partition {partitionEvent.Partition.Id}");
}
}
ReadEventsAsync() method runs indefinitely, processing events as they arrive. You'll typically run this in a background service or long-running process. Handle exceptions and consider cancellation tokens for graceful shutdown.
Example using JavaScript:
const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");
// Replace with your actual connection string, hub name, and consumer group
const connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const consumerGroup = "$Default"; // Or specify your own consumer group
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
async function receiveEvents() {
console.log("Starting to listen for events...");
// Use earliestEventPosition to start from the beginning of the stream, or fromDate, etc.
const subscription = consumerClient.subscribe({
async processEvents(events, context) {
for (const event of events) {
console.log(`Received event: ${event.body} from partition ${context.partitionId}`);
}
},
async processError(err, context) {
console.error("Error processing event:", err, "Partition:", context.partitionId);
}
}, { startPosition: earliestEventPosition }); // Start from the earliest available event
// Keep the program running to receive events
// In a real application, you'd manage this lifecycle more robustly
console.log("Listening...");
// To stop: await subscription.close();
}
receiveEvents().catch((err) => {
console.error("Error occurred:", err);
});
Python example:
from azure.eventhub import EventHubConsumer, EventPosition, EventHubClient
import os
import time
# Replace with your actual connection string, hub name, and consumer group
connection_str = "YOUR_EVENTHUB_CONNECTION_STRING"
event_hub_name = "YOUR_EVENTHUB_NAME"
consumer_group = "$Default" # Or specify your own consumer group
client = EventHubClient(connection_str, consumer_group_name=event_hub_name)
consumer = client.create_consumer(partition_id="0", event_position=EventPosition("-1")) # Start from beginning
print("Starting to listen for events on partition 0...")
try:
while True:
events = consumer.receive(100) # Receive up to 100 events
if not events:
time.sleep(1) # Wait if no events
continue
for event in events:
print(f"Received event: {event.message.value} from partition {event.partition_id}")
except KeyboardInterrupt:
print("Stopping receiver.")
finally:
consumer.close()
client.close()
Java example:
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.core.util.IterableStream;
// Replace with your actual connection string, hub name, and consumer group
String connectionString = "YOUR_EVENTHUB_CONNECTION_STRING";
String eventHubName = "YOUR_EVENTHUB_NAME";
String consumerGroup = "$Default"; // Or specify your own consumer group
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup(consumerGroup)
.buildConsumerClient();
System.out.println("Starting to listen for events...");
// You can specify a starting position, e.g., EventPosition.earliest() or EventPosition.latest()
IterableStream<PartitionEvent> events = consumerClient.read(EventPosition.earliest());
for (PartitionEvent partitionEvent : events) {
if (partitionEvent.getData() != null) {
String messageBody = new String(partitionEvent.getData().getBody().toBytes());
System.out.println("Received event: " + messageBody + " from partition " + partitionEvent.getPartitionId());
}
}
// Note: In a real application, you'd manage the lifecycle of 'events' and the consumer client.
// The above is a simplified loop; often, you'd use a listener pattern.
// For a continuous listener, consider using EventProcessorClient.
// For graceful shutdown, ensure you close the consumerClient.
// consumerClient.close(); // Typically called when your application is shutting down.
EventProcessorClient, manage checkpoints to track progress.Explore the official Azure Event Hubs documentation for more in-depth information.