Receiving Events

This guide details how to effectively receive and process events from Azure Event Hubs using various client libraries and patterns.

Core Concepts for Receiving Events

Choosing a Receiving Method

Azure Event Hubs offers several ways to receive events, depending on your application's needs:

Using the Event Hubs SDK (Python Example)

The Event Hubs SDK provides a straightforward way to build custom event receivers.

1. Installation

pip install azure-eventhub

2. Basic Consumer Code

import asyncio
from azure.eventhub import EventHubClient, EventPosition

async def receive_events(connection_str, event_hub_name):
    client = EventHubClient.from_connection_string(connection_str, event_hub_name)
    consumer = client.get_consumer()

    async with consumer:
        print(f"Starting to receive events from {event_hub_name}...")
        async for event_data in consumer:
            try:
                print(f"Partition: {event_data.partition_id}")
                print(f"Sequence Number: {event_data.sequence_number}")
                print(f"Offset: {event_data.offset}")
                print(f"Enqueued Time: {event_data.enqueued_time}")
                print(f"Body: {event_data.body_as_str()}")
                print("-" * 20)
            except Exception as e:
                print(f"Error processing event: {e}")

async def main():
    # Replace with your Event Hubs connection string and name
    EVENT_HUB_CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STRING"
    EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"

    await receive_events(EVENT_HUB_CONNECTION_STR, EVENT_HUB_NAME)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

3. Important Considerations for SDK Usage

Receiving Events with Azure Functions

Azure Functions offer a simplified, event-driven approach. You can configure an Event Hubs trigger to automatically process incoming events without managing infrastructure.

Example Azure Function (Python)

In your function.json, you would define the Event Hubs trigger:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "myEventHubsTrigger",
      "type": "eventHubTrigger",
      "direction": "in",
      "eventHubName": "YOUR_EVENT_HUB_NAME",
      "connection": "EVENTHUB_CONNECTION_STRING_SETTING",
      "consumerGroup": "$Default"
    }
  ]
}

And in your __init__.py:

import logging
import json
import azure.functions as func

async def main(myEventHubsTrigger: func.EventHubMessage):
    logging.info(f'Python Event Hub trigger function processed a message: {myEventHubsTrigger.get_body().decode()}')
    logging.info(f'Partition ID: {myEventHubsTrigger.partition_id}')
    logging.info(f'Sequence Number: {myEventHubsTrigger.sequence_number}')
    logging.info(f'Offset: {myEventHubsTrigger.offset}')
    logging.info(f'Enqueued Time: {myEventHubsTrigger.enqueued_time}')

    # Example of processing JSON data
    try:
        data = json.loads(myEventHubsTrigger.get_body().decode())
        logging.info(f"Parsed data: {data}")
        # Your processing logic here...
    except ValueError:
        logging.warning("Message is not valid JSON.")
Note: When using Azure Functions with Event Hubs, the platform handles checkpointing automatically, simplifying your development.

Best Practices for Event Reception

Next Steps

Explore advanced scenarios like dead-lettering, advanced filtering, and integration with other Azure services.