Azure Functions Input Bindings: Queue Storage

Introduction to Queue Storage Input Bindings

Azure Functions provides powerful input bindings that simplify the process of interacting with various Azure services. The Queue Storage input binding allows your functions to read messages from an Azure Queue Storage queue. This is particularly useful when you need to process messages that have been placed on a queue but don't necessarily want to trigger the function directly upon message arrival.

Unlike the Queue Trigger, which automatically invokes a function when a new message arrives, an input binding allows you to explicitly retrieve messages at a specific point within your function's execution. This offers more control over when and how messages are processed.

Queue Trigger vs. Queue Input Binding

It's important to distinguish between a Queue Trigger and a Queue Input Binding:

This document focuses on the Queue Input Binding.

Configuring a Queue Input Binding

To use a Queue Storage input binding, you need to define it in your function's configuration file (function.json for JavaScript, Python, and C# in-process, or attributes in C# isolated worker).

The binding is configured with the following properties:

function.json Example (Node.js/JavaScript)


{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    },
    {
      "type": "queue",
      "direction": "in",
      "name": "queueMessage",
      "queueName": "my-input-queue",
      "connection": "AzureWebJobsStorage"
    }
  ]
}
                    

function.json Example (Python)


{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    },
    {
      "type": "queue",
      "direction": "in",
      "name": "queueMessage",
      "queueName": "my-input-queue",
      "connection": "AzureWebJobsStorage"
    }
  ]
}
                    

C# Attribute Example (Isolated Worker)


public static class ProcessQueueMessage
{
    [Function(nameof(ProcessQueueMessage))]
    public static async Task RunAsync(
        [HttpTrigger(AuthorizationLevel.Function, "get", "post")] HttpRequestData req,
        [QueueTrigger("my-input-queue", Connection = "AzureWebJobsStorage")] string queueMessage, // This is a Trigger, not an Input Binding for explicit retrieval
        FunctionContext context)
    {
        // For explicit input binding, you'd inject a QueueServiceClient or similar.
        // However, the primary way to *read* queue messages is via the QueueTrigger.
        // If you need to *process* existing messages without triggering, you might use the SDK.
        // This example demonstrates a common scenario, but for explicit input binding
        // in C# isolated, you'd typically use the SDK directly.
        // Let's simulate getting a message explicitly for demonstration.
        var logger = context.GetLogger(nameof(ProcessQueueMessage));
        logger.LogInformation($"HTTP trigger function processed a request.");

        // Example of *explicitly* reading a message using SDK (more common for non-trigger scenarios)
        // This requires injecting IQueueClient or similar.
        // string messageContent = await GetMessageFromQueueAsync("my-input-queue", logger);
        // if (messageContent != null) {
        //     logger.LogInformation($"Processing message: {messageContent}");
        // } else {
        //     logger.LogInformation("No message found in the queue.");
        // }
        // ... rest of function logic ...
    }

    // This is a placeholder for SDK usage.
    // In a real scenario, you'd inject a configured QueueServiceClient or a wrapper.
    private static async Task<string> GetMessageFromQueueAsync(string queueName, Microsoft.Extensions.Logging.ILogger logger)
    {
        // Replace with actual Azure SDK code to get a single message
        // Example using Azure.Storage.Queues SDK:
        // var queueClient = new Azure.Storage.Queues.QueueClient(
        //     Environment.GetEnvironmentVariable("AzureWebJobsStorage"),
        //     queueName);
        // var response = await queueClient.ReceiveMessageAsync();
        // if (response.Value != null) {
        //     return response.Value.MessageText;
        // }
        return "Simulated message from queue for demonstration."; // Placeholder
    }
}
                    

Note on C# Isolated Worker: For explicit input binding scenarios in C# Isolated Worker, you typically inject an SDK client (e.g., QueueClient from Azure.Storage.Queues) via dependency injection and use its methods to poll or retrieve messages directly. The [QueueTrigger] attribute is for the trigger scenario. This example shows how you might use the SDK.

Common Usage Scenarios

Queue input bindings are ideal for scenarios where:

Connection Strings and Settings

The connection property in the binding configuration refers to an application setting that holds the connection string for your Azure Storage account. By default, it uses the AzureWebJobsStorage setting.

Ensure that your Azure Functions app has a connection string configured for your storage account. This can be set in the Azure portal under the function app's Configuration settings or via environment variables.

Code Examples

Node.js / JavaScript

If the binding is configured with cardinality: "one" (or default), the queueMessage parameter will be a string containing the message content.


module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = context.bindings.queueMessage;

    if (message) {
        context.log(`Processing message from queue: ${message}`);
        // Your logic to process the message goes here
        // For example, parse JSON, update a database, etc.
        try {
            const data = JSON.parse(message);
            context.log(`Parsed data:`, data);
            // Process parsed data...
        } catch (error) {
            context.log.error(`Failed to parse message as JSON: ${error.message}`);
        }
    } else {
        context.log('No message received from queue.');
    }

    context.res = {
        status: 200,
        body: "Queue message processed (or not found)."
    };
};
            

If the binding is configured with cardinality: "many", queueMessage will be an array of strings.


module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request (batch mode).');

    const messages = context.bindings.queueMessage; // This will be an array

    if (messages && messages.length > 0) {
        context.log(`Received ${messages.length} messages from queue.`);
        for (const message of messages) {
            context.log(`Processing message: ${message}`);
            // Process each message...
            try {
                const data = JSON.parse(message);
                context.log(`Parsed data:`, data);
                // Process parsed data...
            } catch (error) {
                context.log.error(`Failed to parse message as JSON: ${error.message}`);
            }
        }
    } else {
        context.log('No messages received from queue.');
    }

    context.res = {
        status: 200,
        body: `Processed ${messages ? messages.length : 0} messages.`
    };
};
            

Python

For a single message (default cardinality):


import logging
import azure.functions as func
import json

def main(req: func.HttpRequest, queueMessage: str) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    if queueMessage:
        logging.info(f'Processing message from queue: {queueMessage}')
        # Your logic to process the message goes here
        try:
            data = json.loads(queueMessage)
            logging.info(f'Parsed data: {data}')
            # Process parsed data...
        except json.JSONDecodeError:
            logging.error('Failed to parse message as JSON.')
    else:
        logging.info('No message received from queue.')

    return func.HttpResponse(
         "Queue message processed (or not found).",
         status_code=200
    )
            

For multiple messages (cardinality: "many"):


import logging
import azure.functions as func
import json

def main(req: func.HttpRequest, queueMessage: list[str]) -> func.HttpResponse: # Type hint for list
    logging.info('Python HTTP trigger function processed a request (batch mode).')

    if queueMessage:
        logging.info(f'Received {len(queueMessage)} messages from queue.')
        for message in queueMessage:
            logging.info(f'Processing message: {message}')
            # Process each message...
            try:
                data = json.loads(message)
                logging.info(f'Parsed data: {data}')
                # Process parsed data...
            except json.JSONDecodeError:
                logging.error('Failed to parse message as JSON.')
    else:
        logging.info('No messages received from queue.')

    return func.HttpResponse(
         f"Processed {len(queueMessage) if queueMessage else 0} messages.",
         status_code=200
    )
            

Note on C#: As mentioned in the configuration section, for explicit queue message retrieval in C# Isolated Worker, you'll typically use the Azure SDK directly by injecting appropriate clients.

Best Practices