Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions lambda-durable-async-job-python-sam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Async Job Processing with AWS Lambda Durable Functions

This pattern implements an async job API using Lambda durable functions. Submit a job via HTTP POST, get a job ID back immediately, then poll for status via HTTP GET. The durable function processes the job through checkpointed steps, updating progress in DynamoDB as it goes.

Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/lambda-durable-async-job-python-sam](https://serverlessland.com/patterns/lambda-durable-async-job-python-sam)

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed
* [Python 3.13](https://www.python.org/downloads/) installed and available in your PATH

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd lambda-durable-async-job-python-sam
```
1. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yaml file:
```
sam build
sam deploy --guided
```
1. During the prompts:
* Enter a stack name
* Enter the desired AWS Region
* Allow SAM CLI to create IAM roles with the required permissions.

Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults.

1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing.

## How it works

This pattern creates:

1. **Durable Lambda Function (Processor)**: A Python 3.13 Lambda function that uses the durable execution SDK to process jobs through three checkpointed steps: validate, process, and finalize. Each step updates progress in DynamoDB.

2. **API Lambda Function**: A standard Lambda function that handles HTTP requests — POST /jobs to submit work and GET /jobs/{id} to poll for status.

3. **HTTP API (API Gateway)**: An Amazon API Gateway HTTP API with two routes for job submission and status polling.

4. **DynamoDB Table**: A table that tracks job status, progress percentage, and results.

### Durable Execution Flow

- **POST /jobs**: API function creates a job record (SUBMITTED), invokes the durable processor asynchronously, returns job ID with HTTP 202.
- **Processor Step 1 (Validate)**: Validates input data, updates progress to 25%.
- **context.wait()**: Brief pause between steps (checkpointed, no compute charges).
- **Processor Step 2 (Process)**: Processes the data, updates progress to 50%.
- **Processor Step 3 (Finalize)**: Finalizes results, updates progress to 75%, then marks COMPLETED at 100%.
- **GET /jobs/{id}**: Returns current status and progress from DynamoDB at any point.

If the durable function is interrupted at any step, it resumes from the last checkpoint without re-executing completed work.

**Before durable functions, this pattern required**: SQS queue + consumer Lambda + Step Functions + custom polling logic. With durable functions, the entire workflow lives in a single function.

## Testing

1. Get the API endpoint from the stack outputs:
```bash
API_URL=$(aws cloudformation describe-stacks \
--stack-name lambda-durable-async-job \
--query "Stacks[0].Outputs[?OutputKey=='ApiEndpoint'].OutputValue" \
--output text)
```

2. Submit a job:
```bash
curl -s -X POST "$API_URL/jobs" \
-H "Content-Type: application/json" \
-d '{"data": "Process this document", "priority": "high"}'
```

Expected response (HTTP 202):
```json
{"job_id": "abc-123-...", "status": "SUBMITTED"}
```

3. Poll for status (every few seconds):
```bash
curl -s "$API_URL/jobs/<JOB_ID>"
```

Status progresses: SUBMITTED → IN_PROGRESS → VALIDATING → PROCESSING → FINALIZING → COMPLETED

4. Test error handling (empty data):
```bash
curl -s -X POST "$API_URL/jobs" \
-H "Content-Type: application/json" \
-d '{"priority": "low"}'
```

The job completes with status FAILED and an error message.

## Cleanup

1. Delete the stack:
```bash
sam delete
```

----
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
69 changes: 69 additions & 0 deletions lambda-durable-async-job-python-sam/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
{
"title": "Amazon DynamoDB Streams to AWS Lambda Durable Function",
"description": "Change data capture pipeline using DynamoDB Streams and Lambda durable functions with checkpointed validate, enrich, and notify steps.",
"language": "Python",
"level": "300",
"framework": "AWS SAM",
"introBox": {
"headline": "How it works",
"text": [
"This pattern processes DynamoDB stream events through a multi-step pipeline using Lambda durable functions.",
"When items are written to the source DynamoDB table, streams trigger a durable function that validates, enriches, and sends notifications for each record.",
"Each step is checkpointed. If the function is interrupted during processing, it resumes from the last checkpoint without re-executing completed steps.",
"Uses ReportBatchItemFailures for partial batch error handling so successfully processed records are not retried.",
"The function is deployed with AutoPublishAlias to meet the qualified ARN requirement for event source mappings with durable functions."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-ddb-streams-python-sam",
"templateURL": "serverless-patterns/lambda-durable-ddb-streams-python-sam",
"projectFolder": "lambda-durable-ddb-streams-python-sam",
"templateFile": "template.yaml"
}
},
"resources": {
"bullets": [
{
"text": "AWS Lambda durable functions",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html"
},
{
"text": "Using Lambda with DynamoDB Streams",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html"
},
{
"text": "Durable Execution SDK for Python",
"link": "https://github.com/aws/aws-durable-execution-sdk-python"
},
{
"text": "Reporting batch item failures for DynamoDB Streams",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting"
}
]
},
"deploy": {
"text": [
"sam build",
"sam deploy --guided"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"Delete the stack: <code>sam delete</code>."
]
},
"authors": [
{
"name": "Ajaya Shrestha",
"image": "",
"bio": "Cloud Support Engineer at AWS specializing in Lambda and serverless architectures.",
"linkedin": ""
}
]
}
114 changes: 114 additions & 0 deletions lambda-durable-async-job-python-sam/src/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
API handler for async job submission and status polling.
POST /jobs — submits a new job, returns job_id immediately.
GET /jobs/{job_id} — returns current job status and progress.
"""

import json
import os
import uuid
import logging
from datetime import datetime, timezone
from decimal import Decimal

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(os.environ["JOBS_TABLE"])
lambda_client = boto3.client("lambda")

PROCESSOR_FUNCTION_NAME = os.environ["PROCESSOR_FUNCTION_NAME"]


class DecimalEncoder(json.JSONEncoder):
"""Handle DynamoDB Decimal types in JSON serialization."""

def default(self, obj):
if isinstance(obj, Decimal):
return int(obj) if obj == int(obj) else float(obj)
return super().default(obj)


def lambda_handler(event, context):
"""Route requests to submit or poll handlers."""
method = event.get("requestContext", {}).get("http", {}).get("method", "")
path = event.get("rawPath", "")

if method == "POST" and path == "/jobs":
return submit_job(event)
elif method == "GET" and path.startswith("/jobs/"):
job_id = event.get("pathParameters", {}).get("job_id", "")
return get_job_status(job_id)
else:
return response(404, {"error": "Not found"})


def submit_job(event):
"""Create a new job and invoke the durable processor asynchronously."""
try:
body = json.loads(event.get("body", "{}"))
except json.JSONDecodeError:
return response(400, {"error": "Invalid JSON body"})

job_id = str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()

# Write initial job record
table.put_item(
Item={
"job_id": job_id,
"status": "SUBMITTED",
"progress": 0,
"input": body,
"created_at": now,
"updated_at": now,
}
)

# Invoke durable processor asynchronously using qualified ARN
# Durable functions MUST be invoked with a published version or alias
lambda_client.invoke(
FunctionName=f"{PROCESSOR_FUNCTION_NAME}:$LATEST",
InvocationType="Event",
Payload=json.dumps({"job_id": job_id, "input": body}),
)

logger.info("Job %s submitted", job_id)
return response(202, {"job_id": job_id, "status": "SUBMITTED"})


def get_job_status(job_id):
"""Retrieve current job status from DynamoDB."""
if not job_id:
return response(400, {"error": "job_id is required"})

result = table.get_item(Key={"job_id": job_id})
item = result.get("Item")

if not item:
return response(404, {"error": f"Job {job_id} not found"})

return response(
200,
{
"job_id": item["job_id"],
"status": item["status"],
"progress": item.get("progress", 0),
"result": item.get("result"),
"error": item.get("error"),
"created_at": item.get("created_at"),
"updated_at": item.get("updated_at"),
},
)


def response(status_code, body):
"""Build an HTTP API response."""
return {
"statusCode": status_code,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(body, cls=DecimalEncoder),
}
Loading