Send and Receive Messages
Learn how to send messages to and receive messages from an Azure Event Hub using various client libraries.
Prerequisites
- An Azure subscription.
- An Azure Event Hubs namespace and an Event Hub created.
- Appropriate connection string or credentials.
Sending Messages
Event Hubs supports sending messages using different client SDKs. Below are examples for common languages.
from azure.eventhub import EventHubProducerClient, EventData
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
def send_event_data_batch(producer):
# Prepare data
data = [
{"sensorId": "1", "temperature": 23.5, "humidity": 60.1},
{"sensorId": "2", "temperature": 24.1, "humidity": 62.5},
]
events = [EventData(str(item)) for item in data]
with producer:
# Send batches
batch = producer.create_batch()
for event in events:
try:
batch.add(event)
except Exception as e:
# Batch is full, send the current batch and start a new one
producer.send_batch(batch)
batch = producer.create_batch()
batch.add(event)
# Send the remaining events in the last batch
producer.send_batch(batch)
print("Messages sent successfully.")
send_event_data_batch(producer)
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
public class EventSender
{
public static async Task SendMessagesAsync(string connectionString, string eventHubName)
{
await using var producer = new EventHubProducerClient(connectionString, eventHubName);
using EventDataBatch eventBatch = await producer.TryCreateBatchAsync();
var messages = new List<string>
{
"{\"sensorId\": \"1\", \"temperature\": 23.5, \"humidity\": 60.1}",
"{\"sensorId\": \"2\", \"temperature\": 24.1, \"humidity\": 62.5}",
"{\"sensorId\": \"3\", \"temperature\": 22.9, \"humidity\": 59.8}"
};
foreach (var message in messages)
{
if (!eventBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
{
// Batch is full, send the current batch and start a new one
await producer.SendAsync(eventBatch);
eventBatch = await producer.TryCreateBatchAsync();
if (!eventBatch.TryAddMessage(new EventData(Encoding.UTF8.GetBytes(message))))
{
Console.WriteLine($"Message too large to add to the batch: {message}");
}
}
}
// Send the remaining events in the last batch
await producer.SendAsync(eventBatch);
Console.WriteLine("Messages sent successfully.");
}
}
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import java.util.Arrays;
import java.util.List;
public class EventSender {
public static void sendMessages(String connectionString, String eventHubName) {
EventHubProducerAsyncClient producer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildAsyncProducerClient();
List<String> messages = Arrays.asList(
"{\"sensorId\": \"1\", \"temperature\": 23.5, \"humidity\": 60.1}",
"{\"sensorId\": \"2\", \"temperature\": 24.1, \"humidity\": 62.5}",
"{\"sensorId\": \"3\", \"temperature\": 22.9, \"humidity\": 59.8}"
);
producer.createBatch(new CreateBatchOptions())
.flatMap(batch -> {
for (String message : messages) {
try {
batch.add(new EventData(message));
} catch (IllegalArgumentException e) {
System.err.println("Message too large to add: " + message);
}
}
return producer.send(batch);
})
.subscribe(
unused -> System.out.println("Messages sent successfully."),
error -> System.err.println("Error sending messages: " + error.getMessage())
);
// In a real application, you'd want to manage the client lifecycle properly
// and wait for operations to complete.
try {
Thread.sleep(5000); // Give time for async operations
} catch (InterruptedException ignored) {}
}
}
const { EventHubProducerClient, EventData } = require("@azure/event-hubs");
const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const producer = new EventHubProducerClient(connectionString, eventHubName);
async function sendEvents() {
const batch = await producer.createBatch();
const messages = [
'{"sensorId": "1", "temperature": 23.5, "humidity": 60.1}',
'{"sensorId": "2", "temperature": 24.1, "humidity": 62.5}',
'{"sensorId": "3", "temperature": 22.9, "humidity": 59.8}',
];
for (const message of messages) {
const eventData = new EventData(message);
if (!batch.tryAdd(eventData)) {
// Batch is full, send the current batch and start a new one
await producer.sendBatch(batch);
// Create a new batch and add the current event
batch = await producer.createBatch();
if (!batch.tryAdd(eventData)) {
console.log(`Message too large to add to the batch: ${message}`);
}
}
}
// Send the remaining events in the last batch
await producer.sendBatch(batch);
console.log("Messages sent successfully.");
}
sendEvents().catch(err => console.error("Error sending events:", err));
Receiving Messages
Consumers can read messages from Event Hubs using consumer groups. Each consumer group provides a separate view of the event stream.
from azure.eventhub import EventHubConsumerClient
connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
consumer_group = "$Default" # Or your custom consumer group
eventhub_name = "YOUR_EVENTHUB_NAME"
consumer = EventHubConsumerClient.from_connection_string(
connection_str,
consumer_group=consumer_group,
eventhub_name=eventhub_name
)
def process_event(event):
print(f"Received event: {event.body_as_json()}")
with consumer:
consumer.receive(on_event=process_event, starting_position="-1") # Start from the beginning
# In a real application, you would want more sophisticated error handling and control flow.
# This example will run indefinitely until interrupted.
print("Consumer started. Press Ctrl+C to stop.")
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Threading.Tasks;
using System.Text;
public class EventReceiver
{
public static async Task ReceiveMessagesAsync(string connectionString, string eventHubName, string consumerGroup)
{
await using var consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
Console.WriteLine("Listening for messages...");
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsAsync())
{
Console.WriteLine($"Received message: {Encoding.UTF8.GetString(partitionEvent.Data.EventBody.ToArray())}");
// Process the event
}
}
}
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.models.EventPosition;
import java.util.function.Consumer;
public class EventReceiver {
public static void receiveMessages(String connectionString, String eventHubName, String consumerGroup) {
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup(consumerGroup)
.buildAsyncConsumerClient();
System.out.println("Listening for messages...");
Consumer<com.azure.messaging.eventhubs.models.PartitionEvent> processEvent = partitionEvent -> {
System.out.println("Received message: " + new String(partitionEvent.getData().getBody().toBytes()));
// Process the event
};
consumer.getEventFlowable(EventPosition.earliest()).subscribe(processEvent, error -> System.err.println("Error receiving messages: " + error.getMessage()));
// In a real application, you'd manage the client lifecycle and prevent the application from exiting.
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException ignored) {}
}
}
const { EventHubConsumerClient, earliest } = require("@azure/event-hubs");
const connectionString = "YOUR_EVENTHUBS_CONNECTION_STRING";
const eventHubName = "YOUR_EVENTHUB_NAME";
const consumerGroup = "$Default"; // Or your custom consumer group
const consumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);
async function processEvents() {
console.log("Listening for messages...");
const subscription = consumer.subscribe({
async processEvents(events, context) {
for (const event of events) {
console.log(`Received message: ${event.body}`);
// Process the event
}
},
async processError(err, context) {
console.error(`Error receiving events: ${err}`);
}
}, { startPosition: earliest() }); // Start from the beginning
// The subscription will keep running until explicitly closed or the process exits.
// To stop, you would call `await subscription.close();`
}
processEvents().catch(err => console.error("Error setting up consumer:", err));
Important Considerations
- Connection Strings: Always protect your connection strings and avoid hardcoding them in production code. Use environment variables or Azure Key Vault.
- Consumer Groups: Each consumer group maintains its own offset. Different applications or instances can consume events independently from the same Event Hub.
- Error Handling: Implement robust error handling for both sending and receiving operations.
- Batching: Sending messages in batches is more efficient than sending them one by one.
Performance Tip
For high-throughput scenarios, consider using the native AMQP protocol and the Event Hubs SDKs, which are optimized for performance and reliability.