Install the appropriate Azure Event Hubs client library for your chosen language.
dotnet add package Azure.Messaging.EventHubs
pip install azure-eventhubs
npm install @azure/event-hubs
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.10.0</version><!-- Use the latest version -->
</dependency>
Step 3: Sending Events
This section demonstrates how to send a batch of events to your Event Hub.
C# Example
// Replace with your connection string and event hub name
string connectionString = "";
string eventHubName = "";
await using var producerClient = new EventHubProducerClient(connectionString, eventHubName);
using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
if (!eventBatch.TryAdd(new EventData(BinaryData.FromString("First event"))))
{
throw new Exception("Failed to add event to batch.");
}
if (!eventBatch.TryAdd(new EventData(BinaryData.FromString("Second event"))))
{
throw new Exception("Failed to add event to batch.");
}
await producerClient.SendAsync(eventBatch);
Console.WriteLine("Events sent successfully.");
Python Example
import os
import asyncio
from azure.eventhub import EventHubProducerClient, EventData
# Replace with your connection string and event hub name
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
eventhub_name = ""
async def run():
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name)
async with producer:
batch = await producer.create_batch()
batch.add(EventData("First event"))
batch.add(EventData("Second event"))
await producer.send_batch(batch)
print("Events sent successfully.")
if __name__ == "__main__":
asyncio.run(run())
JavaScript Example
const { EventHubProducerClient } = require("@azure/event-hubs");
// Replace with your connection string and event hub name
const connectionString = "";
const eventHubName = "";
async function sendEvents() {
const producer = new EventHubProducerClient(connectionString, eventHubName);
const batch = await producer.createBatch();
batch.tryAdd({ body: "First event" });
batch.tryAdd({ body: "Second event" });
await producer.send(batch);
console.log("Events sent successfully.");
await producer.close();
}
sendEvents().catch(err => {
console.error("Error sending events:", err);
});
Java Example
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventBatch;
import com.azure.messaging.eventhubs.models.SendOptions;
import com.azure.core.util.BinaryData;
public class EventHubSender {
public static void main(String[] args) {
// Replace with your connection string and event hub name
String connectionString = "";
String eventHubName = "";
EventHubProducerClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
EventBatch eventBatch = producer.createBatch();
eventBatch.tryAdd(BinaryData.fromString("First event"));
eventBatch.tryAdd(BinaryData.fromString("Second event"));
producer.send(eventBatch);
System.out.println("Events sent successfully.");
producer.close();
}
}
Step 4: Receiving Events
This section demonstrates how to receive events from your Event Hub using a consumer group.
C# Example
// Replace with your connection string and event hub name
string connectionString = "";
string eventHubName = "";
string consumerGroup = "$Default"; // Or your custom consumer group name
await using var consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine("Listening for events...");
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
Console.WriteLine($"Received event: {partitionEvent.Data.EventBody.ToString()}");
}
Python Example
import os
import asyncio
from azure.eventhub import EventHubConsumerClient
# Replace with your connection string and event hub name
connection_str = os.environ["EVENTHUB_CONNECTION_STR"]
eventhub_name = ""
consumer_group = "$Default" # Or your custom consumer group name
async def run():
consumer = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name)
async with consumer:
async for event in consumer.receive():
print(f"Received event: {event.body}")
if __name__ == "__main__":
asyncio.run(run())
JavaScript Example
const { EventHubConsumerClient } = require("@azure/event-hubs");
// Replace with your connection string and event hub name
const connectionString = "";
const eventHubName = "";
const consumerGroup = "$Default"; // Or your custom consumer group name
async function receiveEvents() {
const consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
console.log("Listening for events...");
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Received event: ${event.body}`);
}
},
processError: async (err, context) => {
console.error("Error occurred:", err);
}
});
// Keep the listener running for a while or until an external stop signal
await new Promise(resolve => setTimeout(resolve, 60000)); // Listen for 60 seconds
await subscription.close();
await consumer.close();
}
receiveEvents().catch(err => {
console.error("Error receiving events:", err);
});
Java Example
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.core.credential.AzureKeyCredential;
import java.util.function.Consumer;
public class EventHubReceiver {
public static void main(String[] args) {
// Replace with your connection string and event hub name
String connectionString = "";
String eventHubName = "";
String consumerGroup = "$Default"; // Or your custom consumer group name
EventHubConsumerClient consumer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup(consumerGroup)
.buildConsumerClient();
System.out.println("Listening for events...");
Consumer<com.azure.messaging.eventhubs.models.PartitionEvent> processEvent = event -> {
System.out.println("Received event: " + new String(event.getData().getBody().toBytes()));
};
Consumer<Throwable> processError = error -> {
System.err.println("Error occurred: " + error.getMessage());
};
// To listen indefinitely, you might need a more robust mechanism
// This example shows listening for a limited time or until interrupted.
// For continuous listening, consider background threads or dedicated services.
try {
consumer.read(processEvent, processError, EventPosition.earliest()); // Start from the beginning
Thread.sleep(60000); // Listen for 60 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
consumer.close();
}
}
}
Step 5: Best Practices and Further Learning
Batching: Send events in batches to improve throughput and reduce costs.
Error Handling: Implement robust error handling for transient network issues and other exceptions.
Consumer Groups: Use distinct consumer groups for different applications or services reading from the same Event Hub.
Partitioning: Understand how partitions affect event ordering and parallel processing.
Monitoring: Utilize Azure Monitor to track Event Hub metrics and set up alerts.
For more advanced scenarios, explore topics like schema enforcement, dead-letter queues, and integrating with Azure Stream Analytics.