Python Support for Azure Functions SQS Extension
The Azure Functions SQS Extension now has Python support. If you’re running Azure Functions but need to process messages from Amazon SQS queues, you can now do it in Python with the same patterns you’d use in .NET.
Contents:
- Getting Started
- Sending Messages
- Authentication
- Tuning the Listener
- Under the Hood
- Local Development
- Source Code
Getting Started
See a complete example in the sample project.
Install the package:
pip install azure-functions-sqs
Write a function that triggers on SQS messages:
import json
import logging
from azure_functions_sqs import SqsTrigger, SqsMessage
trigger = SqsTrigger(queue_url="%SQS_QUEUE_URL%")
@trigger
def process_message(message: SqsMessage) -> None:
logging.info(f"Received: {message.message_id}")
logging.info(f"Body: {message.body}")
# Process the message...
data = json.loads(message.body)
logging.info(f"Order ID: {data.get('order_id')}")
# If this function completes without exception, the message is deleted
# If an exception is raised, message becomes visible again for retry
Configure your queue URL in local.settings.json:
{
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "python",
"SQS_QUEUE_URL": "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
}
}
The SqsMessage class gives you access to all SQS message properties:
@trigger
def process(message: SqsMessage) -> None:
# Core properties
print(message.message_id)
print(message.body)
print(message.receipt_handle)
# System attributes
print(message.attributes.get("SentTimestamp"))
print(message.attributes.get("ApproximateReceiveCount"))
# FIFO queue properties
print(message.attributes.get("MessageGroupId"))
print(message.attributes.get("SequenceNumber"))
# Custom message attributes
for name, attr in message.message_attributes.items():
print(f"{name}: {attr.string_value}")
Sending Messages
Output Binding
The simplest way to send messages — decorate a function and return the message body (see full example):
import azure.functions as func
from azure_functions_sqs import SqsOutput
import json
app = func.FunctionApp()
output = SqsOutput(queue_url="%OUTPUT_QUEUE_URL%")
@app.function_name("SendToSqs")
@app.route(route="send", methods=["POST"])
@output
def send_to_sqs(req: func.HttpRequest) -> str:
body = req.get_json()
return json.dumps({
"timestamp": body.get("timestamp"),
"data": body.get("data"),
"source": "azure-function",
})
Batch Sending with SqsCollector
For sending multiple messages efficiently, use SqsCollector — it batches messages in groups of 10 (the SQS limit), similar to IAsyncCollector in .NET (see full example):
from azure_functions_sqs import SqsCollector
@app.function_name("BatchSendToSqs")
@app.route(route="batch", methods=["POST"])
def batch_send(req: func.HttpRequest) -> func.HttpResponse:
collector = SqsCollector(queue_url="%OUTPUT_QUEUE_URL%")
for item in req.get_json().get("items", []):
collector.add({
"item_id": item.get("id"),
"action": item.get("action", "process"),
})
sent_count = collector.flush()
return func.HttpResponse(
json.dumps({"messages_sent": sent_count}),
mimetype="application/json",
)
FIFO Queues
For FIFO queues (URL ends with .fifo), specify a message group ID (see full example):
from azure_functions_sqs import SqsOutput, SqsOutputOptions
fifo_output = SqsOutput(
queue_url="%FIFO_QUEUE_URL%",
options=SqsOutputOptions(message_group_id="order-processing"),
)
@app.route(route="fifo", methods=["POST"])
@fifo_output
def send_to_fifo(req: func.HttpRequest) -> str:
return json.dumps(req.get_json())
Authentication
The extension uses the boto3 credential chain. If you don’t provide explicit credentials, the SDK looks for them in this order:
- Environment variables (
AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY) - Shared credentials file (
~/.aws/credentials) - IAM role for EC2/ECS/EKS
Local Development
Set environment variables in local.settings.json:
{
"Values": {
"AWS_ACCESS_KEY_ID": "your-access-key",
"AWS_SECRET_ACCESS_KEY": "your-secret-key",
"AWS_REGION": "us-east-1"
}
}
Or configure a profile with the AWS CLI:
aws configure
Workload Identity Federation (Recommended for Production)
The best approach for Azure-hosted functions is OIDC federation between Azure AD and AWS. No stored credentials — your function uses its Azure Managed Identity to assume an AWS IAM role.
- Create an OIDC Identity Provider in AWS IAM that trusts Azure AD
- Create an IAM role with a trust policy for your Azure Function’s managed identity
- Use
AssumeRoleWithWebIdentityto get temporary AWS credentials
This eliminates credential management entirely. See AWS docs for setup details.
Azure Key Vault (Alternative)
If federation isn’t an option, store your AWS credentials in Azure Key Vault and reference them in app settings:
AWS_ACCESS_KEY_ID=@Microsoft.KeyVault(SecretUri=https://myvault.vault.azure.net/secrets/aws-key-id/)
AWS_SECRET_ACCESS_KEY=@Microsoft.KeyVault(SecretUri=https://myvault.vault.azure.net/secrets/aws-secret-key/)
Tuning the Listener
Configure polling behavior with SqsTriggerOptions:
from datetime import timedelta
from azure_functions_sqs import SqsTrigger, SqsTriggerOptions
trigger = SqsTrigger(
queue_url="%SQS_QUEUE_URL%",
options=SqsTriggerOptions(
max_number_of_messages=10,
visibility_timeout=timedelta(seconds=30),
polling_interval=timedelta(seconds=5),
),
)
| Setting | Default | Description |
|---|---|---|
max_number_of_messages |
10 | Messages to receive per poll (1-10) |
visibility_timeout |
30 seconds | How long a message stays hidden while processing |
polling_interval |
5 seconds | Delay between polls when queue is empty |
Under the Hood
The trigger uses a sequential polling loop:
┌─────────────────────────────────────────────────────────────────┐
│ Polling Loop │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────┐
│ ReceiveMessage │
│ (long poll, 20s) │
└─────────────────────┘
│
┌────────────────┴────────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Messages? Yes │ │ Messages? No │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Execute Function│ │ Wait interval │
└─────────────────┘ │ (5s default) │
│ └─────────────────┘
│ │
┌───────┴───────┐ │
│ │ │
▼ ▼ │
┌──────────┐ ┌──────────┐ │
│ Success │ │ Failure │ │
└──────────┘ └──────────┘ │
│ │ │
▼ ▼ │
┌──────────┐ ┌──────────────┐ │
│ Delete │ │ Leave in │ │
│ message │ │ queue (retry)│ │
└──────────┘ └──────────────┘ │
│ │ │
└───────┬───────┘ │
│ │
└────────────────┬───────────────┘
│
▼
(repeat loop)
async def _poll_loop(self):
while self._running:
messages_received = await self._poll_and_process()
if messages_received == 0:
await asyncio.sleep(self._options.polling_interval.total_seconds())
Long polling (20-second wait time) reduces API calls — instead of hitting SQS every second, the SDK waits for messages to arrive or the timeout to expire.
Local Development
The extension works with LocalStack for local development without AWS credentials:
trigger = SqsTrigger(
queue_url="http://localhost:4566/000000000000/test-queue",
region="us-east-1",
aws_key_id="test",
aws_access_key="test",
)
See the LocalStack guide for Docker setup. The same scripts work for Python.
Source Code
The extension is open source on GitHub. Key files:
Comments