diff --git a/lambda-durable-saga-python-sam/README.md b/lambda-durable-saga-python-sam/README.md new file mode 100644 index 000000000..da4fc96d8 --- /dev/null +++ b/lambda-durable-saga-python-sam/README.md @@ -0,0 +1,136 @@ +# Saga Pattern with AWS Lambda Durable Functions in Python + +This pattern implements the Saga pattern using AWS Lambda durable functions. It processes a multi-step order through inventory reservation, payment processing, and order confirmation. If any step fails, compensating transactions execute in reverse order to undo all previously completed steps. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/lambda-durable-saga-python-sam](https://serverlessland.com/patterns/lambda-durable-saga-python-sam) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Python 3.13](https://www.python.org/downloads/) installed and available in your PATH + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-durable-saga-python-sam + ``` +1. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yaml file: + ``` + sam build + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern creates: + +1. **Durable Lambda Function (Saga Orchestrator)**: A Python 3.13 Lambda function that orchestrates three checkpointed steps in sequence and executes compensating transactions on failure. + +2. **Orders Table (DynamoDB)**: Tracks order state (CONFIRMED or FAILED). + +3. **Payments Table (DynamoDB)**: Tracks payment reservations and their status (RESERVED, CAPTURED, REFUNDED). + +4. **Inventory Table (DynamoDB)**: Tracks available and reserved stock per item. + +### Saga Execution Flow + +**Happy Path (all steps succeed):** +1. **Reserve Inventory** — decrements available stock, increments reserved count +2. **Process Payment** — creates a payment record with status RESERVED +3. **Confirm Order** — writes order as CONFIRMED, updates payment to CAPTURED + +**Failure Path (compensating transactions):** +1. **Reserve Inventory** — succeeds ✓ +2. **Process Payment** — FAILS (e.g., amount exceeds limit) +3. **Compensate** — runs in reverse order: + - Cancel payment → status set to REFUNDED + - Release inventory → available stock restored to original +4. **Record failure** — writes order as FAILED with error message + +Each step uses `@durable_step` for automatic checkpointing. If the function is interrupted, it resumes from the last completed step without re-execution. + +## Testing + +1. Seed inventory data: + ```bash + aws dynamodb put-item \ + --table-name lambda-durable-saga-inventory \ + --item '{"item_id": {"S": "ITEM-LAPTOP"}, "available": {"N": "50"}, "reserved": {"N": "0"}}' + + aws dynamodb put-item \ + --table-name lambda-durable-saga-inventory \ + --item '{"item_id": {"S": "ITEM-MOUSE"}, "available": {"N": "100"}, "reserved": {"N": "0"}}' + ``` + +2. Test happy path (successful order): + ```bash + aws lambda invoke \ + --function-name "lambda-durable-saga-orchestrator:$LATEST" \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"order_id": "ORD-001", "customer_id": "CUST-100", "items": [{"item_id": "ITEM-LAPTOP", "quantity": 2}, {"item_id": "ITEM-MOUSE", "quantity": 3}], "total_amount": "999.99"}' \ + /tmp/response.json + ``` + + After ~15 seconds, verify: + ```bash + aws dynamodb get-item \ + --table-name lambda-durable-saga-orders \ + --key '{"order_id": {"S": "ORD-001"}}' + ``` + Expected: `"status": "CONFIRMED"` + +3. Test failure path (payment declined triggers compensation): + ```bash + aws lambda invoke \ + --function-name "lambda-durable-saga-orchestrator:$LATEST" \ + --invocation-type Event \ + --cli-binary-format raw-in-base64-out \ + --payload '{"order_id": "ORD-FAIL", "customer_id": "CUST-200", "items": [{"item_id": "ITEM-LAPTOP", "quantity": 5}], "total_amount": "15000.00"}' \ + /tmp/response.json + ``` + + After ~30 seconds, verify compensation executed: + ```bash + # Order should be FAILED + aws dynamodb get-item \ + --table-name lambda-durable-saga-orders \ + --key '{"order_id": {"S": "ORD-FAIL"}}' + + # Inventory should be restored + aws dynamodb get-item \ + --table-name lambda-durable-saga-inventory \ + --key '{"item_id": {"S": "ITEM-LAPTOP"}}' \ + --query 'Item.{available: available.N, reserved: reserved.N}' + ``` + Expected: Order `"status": "FAILED"`, inventory `available: 50, reserved: 0` + +## Cleanup + +1. Delete the stack: + ```bash + sam delete + ``` + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-saga-python-sam/example-pattern.json b/lambda-durable-saga-python-sam/example-pattern.json new file mode 100644 index 000000000..243f3e8db --- /dev/null +++ b/lambda-durable-saga-python-sam/example-pattern.json @@ -0,0 +1,69 @@ +{ + "title": "Saga Pattern with AWS Lambda Durable Functions in Python", + "description": "Multi-step order processing with automatic compensating transactions on failure using Lambda durable functions and Amazon DynamoDB.", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern implements the Saga pattern using AWS Lambda durable functions to orchestrate a multi-step order workflow.", + "Three checkpointed steps execute in sequence: reserve inventory, process payment, and confirm order.", + "If any step fails, compensating transactions automatically execute in reverse order to restore data consistency \u2014 cancelling payment and releasing inventory.", + "Each step uses @durable_step for automatic checkpointing. If interrupted, the function resumes from the last completed step without re-execution.", + "IAM permissions follow least privilege with DynamoDBCrudPolicy scoped to each specific table." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-saga-python-sam", + "templateURL": "serverless-patterns/lambda-durable-saga-python-sam", + "projectFolder": "lambda-durable-saga-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Lambda durable functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Saga pattern \u2014 AWS Prescriptive Guidance", + "link": "https://docs.aws.amazon.com/prescriptive-guidance/latest/modernization-data-persistence/saga-pattern.html" + }, + { + "text": "Durable Execution SDK for Python", + "link": "https://github.com/aws/aws-durable-execution-sdk-python" + }, + { + "text": "Building fault-tolerant applications with Lambda durable functions", + "link": "https://aws.amazon.com/blogs/compute/building-fault-tolerant-long-running-application-with-aws-lambda-durable-functions" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ajaya Shrestha", + "image": "", + "bio": "Cloud Support Engineer at AWS specializing in Lambda and serverless architectures.", + "linkedin": "" + } + ] +} diff --git a/lambda-durable-saga-python-sam/src/orchestrator.py b/lambda-durable-saga-python-sam/src/orchestrator.py new file mode 100644 index 000000000..b107f818b --- /dev/null +++ b/lambda-durable-saga-python-sam/src/orchestrator.py @@ -0,0 +1,246 @@ +""" +Saga orchestrator using Lambda Durable Functions (Python). + +Processes an order through 3 steps: + 1. Reserve inventory + 2. Process payment + 3. Confirm order + +If any step fails, compensating transactions execute in REVERSE order +to undo all previously completed steps, ensuring data consistency. +""" + +import os +import uuid +import logging +from datetime import datetime, timezone +from decimal import Decimal + +import boto3 +from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step +from aws_durable_execution_sdk_python.execution import durable_execution + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +dynamodb = boto3.resource("dynamodb") +orders_table = dynamodb.Table(os.environ["ORDERS_TABLE"]) +payments_table = dynamodb.Table(os.environ["PAYMENTS_TABLE"]) +inventory_table = dynamodb.Table(os.environ["INVENTORY_TABLE"]) + + +# ─── FORWARD STEPS ─────────────────────────────────────────────────────────── + + +@durable_step +def reserve_inventory(step_context: StepContext, order_id: str, items: list) -> dict: + """Step 1: Reserve inventory for each item in the order.""" + step_context.logger.info("Reserving inventory for order %s", order_id) + + reservation_id = str(uuid.uuid4()) + reserved_items = [] + + for item in items: + item_id = item["item_id"] + quantity = item["quantity"] + + # Check availability and reserve + response = inventory_table.get_item(Key={"item_id": item_id}) + stock = response.get("Item") + + if not stock or int(stock.get("available", 0)) < quantity: + raise InsufficientInventoryError( + f"Insufficient stock for item {item_id}: " + f"requested {quantity}, available {stock.get('available', 0) if stock else 0}" + ) + + # Decrement available stock + inventory_table.update_item( + Key={"item_id": item_id}, + UpdateExpression="SET available = available - :qty, reserved = reserved + :qty", + ExpressionAttributeValues={":qty": quantity}, + ConditionExpression="available >= :qty", + ) + reserved_items.append({"item_id": item_id, "quantity": quantity}) + + return {"reservation_id": reservation_id, "reserved_items": reserved_items} + + +@durable_step +def process_payment(step_context: StepContext, order_id: str, amount: str, customer_id: str) -> dict: + """Step 2: Process payment (reserve funds).""" + step_context.logger.info("Processing payment of %s for order %s", amount, order_id) + + payment_id = str(uuid.uuid4()) + + # Simulate payment processing — in production, call a payment gateway + # For demo: fail if amount > 10000 to trigger saga compensation + if Decimal(amount) > Decimal("10000"): + raise PaymentDeclinedError(f"Payment of {amount} declined: exceeds limit") + + payments_table.put_item( + Item={ + "payment_id": payment_id, + "order_id": order_id, + "customer_id": customer_id, + "amount": Decimal(amount), + "status": "RESERVED", + "created_at": datetime.now(timezone.utc).isoformat(), + } + ) + + return {"payment_id": payment_id, "status": "RESERVED"} + + +@durable_step +def confirm_order(step_context: StepContext, order_id: str, reservation: dict, payment: dict) -> dict: + """Step 3: Confirm the order after inventory and payment succeed.""" + step_context.logger.info("Confirming order %s", order_id) + + now = datetime.now(timezone.utc).isoformat() + + orders_table.put_item( + Item={ + "order_id": order_id, + "status": "CONFIRMED", + "reservation_id": reservation["reservation_id"], + "payment_id": payment["payment_id"], + "confirmed_at": now, + } + ) + + # Finalize payment + payments_table.update_item( + Key={"payment_id": payment["payment_id"]}, + UpdateExpression="SET #s = :status, confirmed_at = :now", + ExpressionAttributeValues={":status": "CAPTURED", ":now": now}, + ExpressionAttributeNames={"#s": "status"}, + ) + + return {"order_id": order_id, "status": "CONFIRMED"} + + +# ─── COMPENSATING STEPS (reverse order) ───────────────────────────────────── + + +@durable_step +def compensate_payment(step_context: StepContext, payment: dict) -> dict: + """Compensation: Refund/cancel the payment reservation.""" + step_context.logger.info("Compensating payment %s", payment["payment_id"]) + + payments_table.update_item( + Key={"payment_id": payment["payment_id"]}, + UpdateExpression="SET #s = :status, cancelled_at = :now", + ExpressionAttributeValues={ + ":status": "REFUNDED", + ":now": datetime.now(timezone.utc).isoformat(), + }, + ExpressionAttributeNames={"#s": "status"}, + ) + + return {"payment_id": payment["payment_id"], "status": "REFUNDED"} + + +@durable_step +def compensate_inventory(step_context: StepContext, reservation: dict) -> dict: + """Compensation: Release reserved inventory back to available stock.""" + step_context.logger.info("Compensating inventory reservation %s", reservation["reservation_id"]) + + for item in reservation["reserved_items"]: + inventory_table.update_item( + Key={"item_id": item["item_id"]}, + UpdateExpression="SET available = available + :qty, reserved = reserved - :qty", + ExpressionAttributeValues={":qty": item["quantity"]}, + ) + + return {"reservation_id": reservation["reservation_id"], "status": "RELEASED"} + + +# ─── ORCHESTRATOR ──────────────────────────────────────────────────────────── + + +@durable_execution +def lambda_handler(event, context: DurableContext) -> dict: + """ + Saga orchestrator: executes steps in order, compensates in reverse on failure. + + Input event: + { + "order_id": "ORD-123", + "customer_id": "CUST-456", + "items": [{"item_id": "ITEM-A", "quantity": 2}], + "total_amount": "99.99" + } + """ + order_id = event.get("order_id", str(uuid.uuid4())) + customer_id = event["customer_id"] + items = event["items"] + total_amount = event["total_amount"] + + completed_steps = [] + + try: + # Step 1: Reserve inventory + reservation = context.step(reserve_inventory(order_id, items)) + completed_steps.append(("inventory", reservation)) + context.logger.info("Inventory reserved: %s", reservation["reservation_id"]) + + # Step 2: Process payment + payment = context.step(process_payment(order_id, total_amount, customer_id)) + completed_steps.append(("payment", payment)) + context.logger.info("Payment processed: %s", payment["payment_id"]) + + # Step 3: Confirm order + confirmation = context.step(confirm_order(order_id, reservation, payment)) + context.logger.info("Order confirmed: %s", order_id) + + return { + "status": "SUCCESS", + "order_id": order_id, + "confirmation": confirmation, + } + + except Exception as err: + context.logger.error("Saga failed at step: %s. Starting compensation.", str(err)) + + # Compensate in REVERSE order + for step_name, step_data in reversed(completed_steps): + try: + if step_name == "payment": + context.step(compensate_payment(step_data)) + elif step_name == "inventory": + context.step(compensate_inventory(step_data)) + except Exception as comp_err: + context.logger.error( + "Compensation failed for %s: %s", step_name, str(comp_err) + ) + + # Record failed order + orders_table.put_item( + Item={ + "order_id": order_id, + "status": "FAILED", + "error": str(err), + "failed_at": datetime.now(timezone.utc).isoformat(), + } + ) + + return { + "status": "FAILED", + "order_id": order_id, + "error": str(err), + "compensations_executed": [s[0] for s in reversed(completed_steps)], + } + + +# ─── Custom Exceptions ─────────────────────────────────────────────────────── + + +class InsufficientInventoryError(Exception): + """Raised when requested quantity exceeds available stock.""" + pass + + +class PaymentDeclinedError(Exception): + """Raised when payment processing is declined.""" + pass diff --git a/lambda-durable-saga-python-sam/src/requirements.txt b/lambda-durable-saga-python-sam/src/requirements.txt new file mode 100644 index 000000000..98e790f9b --- /dev/null +++ b/lambda-durable-saga-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +aws-durable-execution-sdk-python>=1.0.0 +boto3>=1.34.0 diff --git a/lambda-durable-saga-python-sam/template.yaml b/lambda-durable-saga-python-sam/template.yaml new file mode 100644 index 000000000..4aef31d14 --- /dev/null +++ b/lambda-durable-saga-python-sam/template.yaml @@ -0,0 +1,101 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Saga pattern with Lambda Durable Functions (Python). + Multi-step order processing with compensating transactions on failure. + If any step fails, previous steps are rolled back in reverse order. + +Globals: + Function: + Runtime: python3.13 + Timeout: 30 + MemorySize: 256 + Architectures: + - arm64 + +Resources: + # Orders table — tracks order state + OrdersTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-orders" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: order_id + AttributeType: S + KeySchema: + - AttributeName: order_id + KeyType: HASH + + # Payments table — tracks payment reservations + PaymentsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-payments" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: payment_id + AttributeType: S + KeySchema: + - AttributeName: payment_id + KeyType: HASH + + # Inventory table — tracks inventory reservations + InventoryTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-inventory" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: item_id + AttributeType: S + KeySchema: + - AttributeName: item_id + KeyType: HASH + + # Durable Function — orchestrates the saga + SagaOrchestratorFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub "${AWS::StackName}-orchestrator" + Handler: orchestrator.lambda_handler + CodeUri: src/ + Description: Durable saga orchestrator with compensating transactions + DurableConfig: + ExecutionTimeout: 3600 + RetentionPeriodInDays: 3 + Environment: + Variables: + ORDERS_TABLE: !Ref OrdersTable + PAYMENTS_TABLE: !Ref PaymentsTable + INVENTORY_TABLE: !Ref InventoryTable + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref OrdersTable + - DynamoDBCrudPolicy: + TableName: !Ref PaymentsTable + - DynamoDBCrudPolicy: + TableName: !Ref InventoryTable + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-orchestrator*" + +Outputs: + OrchestratorFunctionName: + Description: Name of the saga orchestrator function + Value: !Ref SagaOrchestratorFunction + + OrdersTableName: + Description: Name of the orders table + Value: !Ref OrdersTable + + PaymentsTableName: + Description: Name of the payments table + Value: !Ref PaymentsTable + + InventoryTableName: + Description: Name of the inventory table + Value: !Ref InventoryTable