diff --git a/lambda-durable-async-job-python-sam/README.md b/lambda-durable-async-job-python-sam/README.md new file mode 100644 index 000000000..6754921b1 --- /dev/null +++ b/lambda-durable-async-job-python-sam/README.md @@ -0,0 +1,114 @@ +# Async Job Processing with AWS Lambda Durable Functions + +This pattern implements an async job API using Lambda durable functions. Submit a job via HTTP POST, get a job ID back immediately, then poll for status via HTTP GET. The durable function processes the job through checkpointed steps, updating progress in DynamoDB as it goes. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/lambda-durable-async-job-python-sam](https://serverlessland.com/patterns/lambda-durable-async-job-python-sam) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Python 3.13](https://www.python.org/downloads/) installed and available in your PATH + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-durable-async-job-python-sam + ``` +1. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yaml file: + ``` + sam build + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern creates: + +1. **Durable Lambda Function (Processor)**: A Python 3.13 Lambda function that uses the durable execution SDK to process jobs through three checkpointed steps: validate, process, and finalize. Each step updates progress in DynamoDB. + +2. **API Lambda Function**: A standard Lambda function that handles HTTP requests — POST /jobs to submit work and GET /jobs/{id} to poll for status. + +3. **HTTP API (API Gateway)**: An Amazon API Gateway HTTP API with two routes for job submission and status polling. + +4. **DynamoDB Table**: A table that tracks job status, progress percentage, and results. + +### Durable Execution Flow + +- **POST /jobs**: API function creates a job record (SUBMITTED), invokes the durable processor asynchronously, returns job ID with HTTP 202. +- **Processor Step 1 (Validate)**: Validates input data, updates progress to 25%. +- **context.wait()**: Brief pause between steps (checkpointed, no compute charges). +- **Processor Step 2 (Process)**: Processes the data, updates progress to 50%. +- **Processor Step 3 (Finalize)**: Finalizes results, updates progress to 75%, then marks COMPLETED at 100%. +- **GET /jobs/{id}**: Returns current status and progress from DynamoDB at any point. + +If the durable function is interrupted at any step, it resumes from the last checkpoint without re-executing completed work. + +**Before durable functions, this pattern required**: SQS queue + consumer Lambda + Step Functions + custom polling logic. With durable functions, the entire workflow lives in a single function. + +## Testing + +1. Get the API endpoint from the stack outputs: + ```bash + API_URL=$(aws cloudformation describe-stacks \ + --stack-name lambda-durable-async-job \ + --query "Stacks[0].Outputs[?OutputKey=='ApiEndpoint'].OutputValue" \ + --output text) + ``` + +2. Submit a job: + ```bash + curl -s -X POST "$API_URL/jobs" \ + -H "Content-Type: application/json" \ + -d '{"data": "Process this document", "priority": "high"}' + ``` + + Expected response (HTTP 202): + ```json + {"job_id": "abc-123-...", "status": "SUBMITTED"} + ``` + +3. Poll for status (every few seconds): + ```bash + curl -s "$API_URL/jobs/" + ``` + + Status progresses: SUBMITTED → IN_PROGRESS → VALIDATING → PROCESSING → FINALIZING → COMPLETED + +4. Test error handling (empty data): + ```bash + curl -s -X POST "$API_URL/jobs" \ + -H "Content-Type: application/json" \ + -d '{"priority": "low"}' + ``` + + The job completes with status FAILED and an error message. + +## Cleanup + +1. Delete the stack: + ```bash + sam delete + ``` + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-async-job-python-sam/example-pattern.json b/lambda-durable-async-job-python-sam/example-pattern.json new file mode 100644 index 000000000..0dc5cb3eb --- /dev/null +++ b/lambda-durable-async-job-python-sam/example-pattern.json @@ -0,0 +1,69 @@ +{ + "title": "Amazon DynamoDB Streams to AWS Lambda Durable Function", + "description": "Change data capture pipeline using DynamoDB Streams and Lambda durable functions with checkpointed validate, enrich, and notify steps.", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern processes DynamoDB stream events through a multi-step pipeline using Lambda durable functions.", + "When items are written to the source DynamoDB table, streams trigger a durable function that validates, enriches, and sends notifications for each record.", + "Each step is checkpointed. If the function is interrupted during processing, it resumes from the last checkpoint without re-executing completed steps.", + "Uses ReportBatchItemFailures for partial batch error handling so successfully processed records are not retried.", + "The function is deployed with AutoPublishAlias to meet the qualified ARN requirement for event source mappings with durable functions." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-ddb-streams-python-sam", + "templateURL": "serverless-patterns/lambda-durable-ddb-streams-python-sam", + "projectFolder": "lambda-durable-ddb-streams-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Lambda durable functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Using Lambda with DynamoDB Streams", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html" + }, + { + "text": "Durable Execution SDK for Python", + "link": "https://github.com/aws/aws-durable-execution-sdk-python" + }, + { + "text": "Reporting batch item failures for DynamoDB Streams", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ajaya Shrestha", + "image": "", + "bio": "Cloud Support Engineer at AWS specializing in Lambda and serverless architectures.", + "linkedin": "" + } + ] +} diff --git a/lambda-durable-async-job-python-sam/src/api.py b/lambda-durable-async-job-python-sam/src/api.py new file mode 100644 index 000000000..ee7006609 --- /dev/null +++ b/lambda-durable-async-job-python-sam/src/api.py @@ -0,0 +1,114 @@ +""" +API handler for async job submission and status polling. +POST /jobs — submits a new job, returns job_id immediately. +GET /jobs/{job_id} — returns current job status and progress. +""" + +import json +import os +import uuid +import logging +from datetime import datetime, timezone +from decimal import Decimal + +import boto3 + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +dynamodb = boto3.resource("dynamodb") +table = dynamodb.Table(os.environ["JOBS_TABLE"]) +lambda_client = boto3.client("lambda") + +PROCESSOR_FUNCTION_NAME = os.environ["PROCESSOR_FUNCTION_NAME"] + + +class DecimalEncoder(json.JSONEncoder): + """Handle DynamoDB Decimal types in JSON serialization.""" + + def default(self, obj): + if isinstance(obj, Decimal): + return int(obj) if obj == int(obj) else float(obj) + return super().default(obj) + + +def lambda_handler(event, context): + """Route requests to submit or poll handlers.""" + method = event.get("requestContext", {}).get("http", {}).get("method", "") + path = event.get("rawPath", "") + + if method == "POST" and path == "/jobs": + return submit_job(event) + elif method == "GET" and path.startswith("/jobs/"): + job_id = event.get("pathParameters", {}).get("job_id", "") + return get_job_status(job_id) + else: + return response(404, {"error": "Not found"}) + + +def submit_job(event): + """Create a new job and invoke the durable processor asynchronously.""" + try: + body = json.loads(event.get("body", "{}")) + except json.JSONDecodeError: + return response(400, {"error": "Invalid JSON body"}) + + job_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + + # Write initial job record + table.put_item( + Item={ + "job_id": job_id, + "status": "SUBMITTED", + "progress": 0, + "input": body, + "created_at": now, + "updated_at": now, + } + ) + + # Invoke durable processor asynchronously using qualified ARN + # Durable functions MUST be invoked with a published version or alias + lambda_client.invoke( + FunctionName=f"{PROCESSOR_FUNCTION_NAME}:$LATEST", + InvocationType="Event", + Payload=json.dumps({"job_id": job_id, "input": body}), + ) + + logger.info("Job %s submitted", job_id) + return response(202, {"job_id": job_id, "status": "SUBMITTED"}) + + +def get_job_status(job_id): + """Retrieve current job status from DynamoDB.""" + if not job_id: + return response(400, {"error": "job_id is required"}) + + result = table.get_item(Key={"job_id": job_id}) + item = result.get("Item") + + if not item: + return response(404, {"error": f"Job {job_id} not found"}) + + return response( + 200, + { + "job_id": item["job_id"], + "status": item["status"], + "progress": item.get("progress", 0), + "result": item.get("result"), + "error": item.get("error"), + "created_at": item.get("created_at"), + "updated_at": item.get("updated_at"), + }, + ) + + +def response(status_code, body): + """Build an HTTP API response.""" + return { + "statusCode": status_code, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps(body, cls=DecimalEncoder), + } diff --git a/lambda-durable-async-job-python-sam/src/processor.py b/lambda-durable-async-job-python-sam/src/processor.py new file mode 100644 index 000000000..cc20f66bf --- /dev/null +++ b/lambda-durable-async-job-python-sam/src/processor.py @@ -0,0 +1,126 @@ +""" +Durable function that processes a job in checkpointed steps. +Each step updates progress in DynamoDB. If the function is interrupted, +it resumes from the last checkpoint without re-executing completed steps. +""" + +import os +import logging +from datetime import datetime, timezone +from decimal import Decimal + +import boto3 +from aws_durable_execution_sdk_python.config import Duration +from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step +from aws_durable_execution_sdk_python.execution import durable_execution + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +dynamodb = boto3.resource("dynamodb") +table = dynamodb.Table(os.environ["JOBS_TABLE"]) + + +def update_job_status(job_id: str, status: str, progress: int, result=None, error=None): + """Update job status in DynamoDB.""" + update_expr = "SET #s = :status, progress = :progress, updated_at = :now" + expr_values = { + ":status": status, + ":progress": progress, + ":now": datetime.now(timezone.utc).isoformat(), + } + expr_names = {"#s": "status"} + + if result is not None: + update_expr += ", #r = :result" + expr_values[":result"] = result + expr_names["#r"] = "result" + + if error is not None: + update_expr += ", #e = :error" + expr_values[":error"] = error + expr_names["#e"] = "error" + + table.update_item( + Key={"job_id": job_id}, + UpdateExpression=update_expr, + ExpressionAttributeValues=expr_values, + ExpressionAttributeNames=expr_names, + ) + + +@durable_step +def validate_input(step_context: StepContext, job_id: str, input_data: dict) -> dict: + """Step 1: Validate the input data.""" + step_context.logger.info("Validating input for job %s", job_id) + update_job_status(job_id, "VALIDATING", 25) + + # Simulate validation logic + if not input_data.get("data"): + raise ValueError("Missing required field: data") + + return {"validated": True, "item_count": len(str(input_data.get("data", "")))} + + +@durable_step +def process_data(step_context: StepContext, job_id: str, validation: dict) -> dict: + """Step 2: Process the data (simulates work).""" + step_context.logger.info("Processing data for job %s", job_id) + update_job_status(job_id, "PROCESSING", 50) + + # Simulate processing — in real use, this could be: + # - Calling an ML inference endpoint + # - Transforming and loading data + # - Generating a report + processed_items = validation["item_count"] * 2 + return {"processed_items": processed_items, "quality_score": Decimal("0.95")} + + +@durable_step +def finalize_result(step_context: StepContext, job_id: str, processing: dict) -> dict: + """Step 3: Finalize and store the result.""" + step_context.logger.info("Finalizing job %s", job_id) + update_job_status(job_id, "FINALIZING", 75) + + final_result = { + "processed_items": processing["processed_items"], + "quality_score": str(processing["quality_score"]), + "summary": f"Successfully processed {processing['processed_items']} items", + } + return final_result + + +@durable_execution +def lambda_handler(event, context: DurableContext) -> dict: + """ + Durable job processor with 3 checkpointed steps. + If interrupted at any point, resumes from the last completed step. + """ + job_id = event["job_id"] + input_data = event.get("input", {}) + + try: + update_job_status(job_id, "IN_PROGRESS", 10) + + # Step 1: Validate (checkpointed) + validation = context.step(validate_input(job_id, input_data)) + + # Brief pause between steps (simulates rate limiting or cooldown) + context.wait(Duration.from_seconds(2)) + + # Step 2: Process (checkpointed) + processing = context.step(process_data(job_id, validation)) + + # Step 3: Finalize (checkpointed) + result = context.step(finalize_result(job_id, processing)) + + # Mark complete + update_job_status(job_id, "COMPLETED", 100, result=result) + context.logger.info("Job %s completed successfully", job_id) + + return {"status": "COMPLETED", "job_id": job_id, "result": result} + + except Exception as err: + context.logger.error("Job %s failed: %s", job_id, str(err)) + update_job_status(job_id, "FAILED", 0, error=str(err)) + return {"status": "FAILED", "job_id": job_id, "error": str(err)} diff --git a/lambda-durable-async-job-python-sam/src/requirements.txt b/lambda-durable-async-job-python-sam/src/requirements.txt new file mode 100644 index 000000000..98e790f9b --- /dev/null +++ b/lambda-durable-async-job-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +aws-durable-execution-sdk-python>=1.0.0 +boto3>=1.34.0 diff --git a/lambda-durable-async-job-python-sam/template.yaml b/lambda-durable-async-job-python-sam/template.yaml new file mode 100644 index 000000000..e02be8d5e --- /dev/null +++ b/lambda-durable-async-job-python-sam/template.yaml @@ -0,0 +1,97 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Async job processing with Lambda Durable Functions. + Submit a job via API, get a job ID back immediately, + then poll for status. No SQS, no extra state machine needed. + +Globals: + Function: + Runtime: python3.13 + Timeout: 30 + MemorySize: 256 + Architectures: + - arm64 + +Resources: + # DynamoDB table for job status tracking + JobsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-jobs" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: job_id + AttributeType: S + KeySchema: + - AttributeName: job_id + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # Durable Function — processes the job asynchronously with checkpoints + JobProcessorFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub "${AWS::StackName}-processor" + Handler: processor.lambda_handler + CodeUri: src/ + Description: Durable function that processes jobs with automatic checkpointing + DurableConfig: + ExecutionTimeout: 3600 + RetentionPeriodInDays: 3 + Environment: + Variables: + JOBS_TABLE: !Ref JobsTable + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref JobsTable + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-processor*" + + # Standard Function — submits jobs and queries status via API + ApiFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub "${AWS::StackName}-api" + Handler: api.lambda_handler + CodeUri: src/ + Description: API handler for submitting and polling job status + Environment: + Variables: + JOBS_TABLE: !Ref JobsTable + PROCESSOR_FUNCTION_NAME: !Ref JobProcessorFunction + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref JobsTable + - LambdaInvokePolicy: + FunctionName: !Ref JobProcessorFunction + Events: + SubmitJob: + Type: HttpApi + Properties: + Path: /jobs + Method: POST + GetJobStatus: + Type: HttpApi + Properties: + Path: /jobs/{job_id} + Method: GET + +Outputs: + ApiEndpoint: + Description: HTTP API endpoint URL + Value: !Sub "https://${ServerlessHttpApi}.execute-api.${AWS::Region}.amazonaws.com" + + JobsTableName: + Description: Name of the DynamoDB jobs table + Value: !Ref JobsTable + + ProcessorFunctionName: + Description: Name of the durable processor function + Value: !Ref JobProcessorFunction