Sending and Receiving Events with Azure Event Hubs

This guide covers the fundamental operations of sending events to and receiving events from Azure Event Hubs. We'll explore common patterns and provide code examples using popular SDKs.

Prerequisites

Before you begin, ensure you have:

Sending Events

Event Hubs allows for high-throughput data ingestion. You can send events individually or in batches. Batching is generally more efficient.

Using the Azure SDK (Python Example)

The following code snippet demonstrates how to send a single event using the Python SDK:


from azure.eventhub import EventHubProducerClient, EventData

connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"

try:
    producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    with producer:
        event_data = EventData("Hello, Event Hubs!")
        producer.send_event(event_data)
        print("Event sent successfully!")
except Exception as e:
    print(f"Error sending event: {e}")
                

Sending Batched Events

To send events in batches, you can create an `EventDataBatch` object:


from azure.eventhub import EventHubProducerClient, EventData, EventDataBatch

connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"

try:
    producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
    with producer:
        batch = producer.create_batch()
        for i in range(5):
            event_content = f"Message {i}"
            event_data = EventData(event_content)
            try:
                batch.add(event_data)
            except ValueError:
                # Batch is full, send the current batch and start a new one
                producer.send_batch(batch)
                batch = producer.create_batch()
                batch.add(event_data)

        # Send the last batch if it's not empty
        if batch.count > 0:
            producer.send_batch(batch)
            print(f"{batch.count} events sent in the last batch.")

except Exception as e:
    print(f"Error sending events: {e}")
                
Note: Event Hubs has limits on batch size (maximum 256 KB). The SDK handles this by allowing you to add events until the limit is reached.

Receiving Events

Consumers can read events from Event Hubs using consumer groups. Each consumer group maintains its own read position within the Event Hub partitions.

Using the Azure SDK (Python Example)

The following code demonstrates how to create a consumer client and receive events:


from azure.eventhub import EventHubConsumerClient, EventPosition

connection_str = "YOUR_EVENTHUBS_CONNECTION_STRING"
eventhub_name = "YOUR_EVENTHUB_NAME"
consumer_group = "$Default" # Or your custom consumer group

try:
    consumer = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name
    )

    def process_events(event):
        print(f"Received event: {event.body_as_str()}")
        # Process the event data here...

    with consumer:
        # Start receiving events from the latest available offset
        consumer.subscribe(
            ["1"], # Specify partition IDs you want to receive from, e.g., ["1"] or all partitions
            on_message=process_events,
            event_position=EventPosition("-1") # Use "-1" for earliest, "latest" for latest
        )
        # Keep the consumer running to receive events
        consumer.run()

except Exception as e:
    print(f"Error receiving events: {e}")
                
Tip: It's crucial to use consumer groups to allow multiple applications or instances of the same application to process events independently without interfering with each other.

Key Considerations

For more advanced scenarios, such as schema validation, custom partitioning strategies, and integration with other Azure services like Azure Functions and Azure Stream Analytics, please refer to the Advanced Topics section.

Previous: Authentication Next: Partitioning