diff --git a/lambda-durable-ddb-streams-python-sam/README.md b/lambda-durable-ddb-streams-python-sam/README.md new file mode 100644 index 000000000..2010074fd --- /dev/null +++ b/lambda-durable-ddb-streams-python-sam/README.md @@ -0,0 +1,115 @@ +# Amazon DynamoDB Streams to AWS Lambda Durable Function + +This pattern implements a change data capture (CDC) pipeline using DynamoDB Streams and Lambda durable functions. When items are written to the source table, the stream triggers a multi-step processing pipeline with automatic checkpointing at each step. + +Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/lambda-durable-ddb-streams-python-sam](https://serverlessland.com/patterns/lambda-durable-ddb-streams-python-sam) + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed +* [Python 3.13](https://www.python.org/downloads/) installed and available in your PATH + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-durable-ddb-streams-python-sam + ``` +1. From the command line, use AWS SAM to build and deploy the AWS resources for the pattern as specified in the template.yaml file: + ``` + sam build + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Allow SAM CLI to create IAM roles with the required permissions. + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + +1. Note the outputs from the SAM deployment process. These contain the resource names and/or ARNs which are used for testing. + +## How it works + +This pattern creates: + +1. **Durable Lambda Function (Stream Processor)**: A Python 3.13 Lambda function that processes DynamoDB stream events through three checkpointed steps: validate, enrich, and notify. + +2. **Source Table (DynamoDB with Streams)**: The table where items are written. DynamoDB Streams captures INSERT, MODIFY, and REMOVE events with NEW_AND_OLD_IMAGES. + +3. **Processed Table (DynamoDB)**: Stores enriched records with computed metadata (event type, timestamps, flags). + +4. **Notifications Table (DynamoDB)**: Audit trail of notification records for each processed event. + +### Durable Execution Flow + +1. An item is written to the source table, generating a stream event. +2. The durable function receives a batch of stream records. +3. For each record: + - **Step 1 (Validate)**: Checks record integrity and required fields. REMOVE events are validated but skip further processing. + - **Step 2 (Enrich)**: Adds computed fields (category, timestamps, name length, description flag) and writes to the processed table. + - **Step 3 (Notify)**: Creates an audit notification record in the notifications table. +4. Each step is checkpointed. If the function replays, completed steps return cached results without re-executing. +5. Uses `ReportBatchItemFailures` so only failed records are retried — successfully processed records are not re-sent. + +**Important**: This pattern uses `AutoPublishAlias: live` because DynamoDB Streams event source mappings with durable functions require a qualified ARN (published version or alias). The `ExecutionTimeout` is set to 900 seconds (maximum allowed for event source mappings). + +## Testing + +1. Write an item to the source table (INSERT event): + ```bash + aws dynamodb put-item \ + --table-name lambda-durable-ddb-streams-source \ + --item '{ + "pk": {"S": "PRODUCT-001"}, + "name": {"S": "Wireless Headphones"}, + "category": {"S": "Electronics"}, + "description": {"S": "Premium noise-cancelling headphones"} + }' + ``` + +2. After 10-30 seconds, verify the processed record: + ```bash + aws dynamodb scan --table-name lambda-durable-ddb-streams-processed + ``` + + Expected: enriched record with `event_type: INSERT`, `processed_at`, `name_length`, `has_description: true`. + +3. Verify the notification: + ```bash + aws dynamodb scan --table-name lambda-durable-ddb-streams-notifications + ``` + + Expected: notification record with `event_type`, `message`, `created_at`. + +4. Test MODIFY event: + ```bash + aws dynamodb put-item \ + --table-name lambda-durable-ddb-streams-source \ + --item '{ + "pk": {"S": "PRODUCT-001"}, + "name": {"S": "Wireless Headphones Pro"}, + "category": {"S": "Electronics"} + }' + ``` + +## Cleanup + +1. Delete the stack: + ```bash + sam delete + ``` + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-durable-ddb-streams-python-sam/example-pattern.json b/lambda-durable-ddb-streams-python-sam/example-pattern.json new file mode 100644 index 000000000..0dc5cb3eb --- /dev/null +++ b/lambda-durable-ddb-streams-python-sam/example-pattern.json @@ -0,0 +1,69 @@ +{ + "title": "Amazon DynamoDB Streams to AWS Lambda Durable Function", + "description": "Change data capture pipeline using DynamoDB Streams and Lambda durable functions with checkpointed validate, enrich, and notify steps.", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern processes DynamoDB stream events through a multi-step pipeline using Lambda durable functions.", + "When items are written to the source DynamoDB table, streams trigger a durable function that validates, enriches, and sends notifications for each record.", + "Each step is checkpointed. If the function is interrupted during processing, it resumes from the last checkpoint without re-executing completed steps.", + "Uses ReportBatchItemFailures for partial batch error handling so successfully processed records are not retried.", + "The function is deployed with AutoPublishAlias to meet the qualified ARN requirement for event source mappings with durable functions." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-durable-ddb-streams-python-sam", + "templateURL": "serverless-patterns/lambda-durable-ddb-streams-python-sam", + "projectFolder": "lambda-durable-ddb-streams-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Lambda durable functions", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/durable-functions.html" + }, + { + "text": "Using Lambda with DynamoDB Streams", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html" + }, + { + "text": "Durable Execution SDK for Python", + "link": "https://github.com/aws/aws-durable-execution-sdk-python" + }, + { + "text": "Reporting batch item failures for DynamoDB Streams", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-batchfailurereporting" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Ajaya Shrestha", + "image": "", + "bio": "Cloud Support Engineer at AWS specializing in Lambda and serverless architectures.", + "linkedin": "" + } + ] +} diff --git a/lambda-durable-ddb-streams-python-sam/src/processor.py b/lambda-durable-ddb-streams-python-sam/src/processor.py new file mode 100644 index 000000000..2e64c646a --- /dev/null +++ b/lambda-durable-ddb-streams-python-sam/src/processor.py @@ -0,0 +1,172 @@ +""" +DynamoDB Streams → Lambda Durable Function. + +Processes stream records through a multi-step pipeline: + 1. Validate — check record integrity + 2. Enrich — add computed fields and metadata + 3. Notify — write an audit notification record + +Each step is checkpointed. If the function is interrupted during processing, +it resumes from the last checkpoint without re-executing completed steps. + +Uses ReportBatchItemFailures for partial batch error handling. +""" + +import os +import uuid +import logging +from datetime import datetime, timezone +from decimal import Decimal + +import boto3 +from aws_durable_execution_sdk_python.context import DurableContext, StepContext, durable_step +from aws_durable_execution_sdk_python.execution import durable_execution + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +dynamodb = boto3.resource("dynamodb") +processed_table = dynamodb.Table(os.environ["PROCESSED_TABLE"]) +notifications_table = dynamodb.Table(os.environ["NOTIFICATIONS_TABLE"]) + + +@durable_step +def validate_record(step_context: StepContext, record: dict) -> dict: + """Step 1: Validate the stream record has required fields.""" + step_context.logger.info("Validating record: %s", record.get("pk")) + + event_name = record.get("event_name") + new_image = record.get("new_image", {}) + + if event_name == "REMOVE": + # Deletions are valid but we skip enrichment + return {"valid": True, "skip_processing": True, "event_name": event_name} + + # Validate required fields for INSERT/MODIFY + required_fields = ["pk", "name"] + missing = [f for f in required_fields if f not in new_image] + + if missing: + raise UnrecoverableInvocationError( + f"Record missing required fields: {missing}" + ) + + return { + "valid": True, + "skip_processing": False, + "event_name": event_name, + "data": new_image, + } + + +@durable_step +def enrich_record(step_context: StepContext, validation: dict) -> dict: + """Step 2: Enrich the record with computed fields.""" + step_context.logger.info("Enriching record: %s", validation["data"].get("pk")) + + data = validation["data"] + now = datetime.now(timezone.utc).isoformat() + + # Add enrichment fields + enriched = { + "pk": data["pk"], + "name": data["name"], + "category": data.get("category", "UNCATEGORIZED"), + "event_type": validation["event_name"], + "processed_at": now, + "name_length": len(data.get("name", "")), + "has_description": "description" in data, + } + + # Write enriched record to processed table + processed_table.put_item(Item=enriched) + + return enriched + + +@durable_step +def send_notification(step_context: StepContext, enriched: dict) -> dict: + """Step 3: Create an audit notification for the processed record.""" + step_context.logger.info("Sending notification for: %s", enriched["pk"]) + + notification_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc).isoformat() + + notification = { + "notification_id": notification_id, + "record_pk": enriched["pk"], + "event_type": enriched["event_type"], + "message": f"Record '{enriched['name']}' was processed (event: {enriched['event_type']})", + "created_at": now, + } + + notifications_table.put_item(Item=notification) + + return {"notification_id": notification_id, "status": "SENT"} + + +@durable_execution +def lambda_handler(event, context: DurableContext) -> dict: + """ + Process DynamoDB stream records through a checkpointed pipeline. + Returns batch item failures for records that could not be processed. + """ + batch_item_failures = [] + + for record in event.get("Records", []): + event_id = record.get("eventID", "unknown") + + try: + # Extract stream record data + stream_record = { + "pk": record.get("dynamodb", {}).get("Keys", {}).get("pk", {}).get("S", ""), + "event_name": record.get("eventName", ""), + "new_image": _deserialize_image( + record.get("dynamodb", {}).get("NewImage", {}) + ), + "old_image": _deserialize_image( + record.get("dynamodb", {}).get("OldImage", {}) + ), + } + + # Step 1: Validate + validation = context.step(validate_record(stream_record)) + + if validation.get("skip_processing"): + context.logger.info("Skipping REMOVE event for %s", stream_record["pk"]) + continue + + # Step 2: Enrich + enriched = context.step(enrich_record(validation)) + + # Step 3: Notify + notification = context.step(send_notification(enriched)) + + context.logger.info( + "Processed %s → notification %s", + stream_record["pk"], + notification["notification_id"], + ) + + except Exception as err: + context.logger.error("Failed to process record %s: %s", event_id, str(err)) + batch_item_failures.append({"itemIdentifier": event_id}) + + return {"batchItemFailures": batch_item_failures} + + +def _deserialize_image(image: dict) -> dict: + """Simple DynamoDB image deserializer for common types.""" + result = {} + for key, value in image.items(): + if "S" in value: + result[key] = value["S"] + elif "N" in value: + result[key] = value["N"] + elif "BOOL" in value: + result[key] = value["BOOL"] + elif "NULL" in value: + result[key] = None + else: + result[key] = str(value) + return result diff --git a/lambda-durable-ddb-streams-python-sam/src/requirements.txt b/lambda-durable-ddb-streams-python-sam/src/requirements.txt new file mode 100644 index 000000000..98e790f9b --- /dev/null +++ b/lambda-durable-ddb-streams-python-sam/src/requirements.txt @@ -0,0 +1,2 @@ +aws-durable-execution-sdk-python>=1.0.0 +boto3>=1.34.0 diff --git a/lambda-durable-ddb-streams-python-sam/template.yaml b/lambda-durable-ddb-streams-python-sam/template.yaml new file mode 100644 index 000000000..e2dc8e6df --- /dev/null +++ b/lambda-durable-ddb-streams-python-sam/template.yaml @@ -0,0 +1,115 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + DynamoDB Streams to Lambda Durable Function (Python). + Change data capture triggers a multi-step processing pipeline + with automatic checkpointing. Each record is processed through + validate, enrich, and notify steps without re-execution on replay. + +Globals: + Function: + Runtime: python3.13 + Timeout: 30 + MemorySize: 256 + Architectures: + - arm64 + +Resources: + # Source table — when items are written here, streams trigger processing + SourceTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-source" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + + # Processed records table — enriched data lands here + ProcessedTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-processed" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + + # Notifications table — audit trail of notifications sent + NotificationsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: !Sub "${AWS::StackName}-notifications" + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: notification_id + AttributeType: S + KeySchema: + - AttributeName: notification_id + KeyType: HASH + + # Durable Function — processes stream records with checkpoints + StreamProcessorFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Sub "${AWS::StackName}-processor" + Handler: processor.lambda_handler + CodeUri: src/ + Description: Durable function processing DynamoDB stream events with checkpoints + AutoPublishAlias: live + DurableConfig: + ExecutionTimeout: 900 + RetentionPeriodInDays: 3 + Environment: + Variables: + PROCESSED_TABLE: !Ref ProcessedTable + NOTIFICATIONS_TABLE: !Ref NotificationsTable + Policies: + - DynamoDBCrudPolicy: + TableName: !Ref ProcessedTable + - DynamoDBCrudPolicy: + TableName: !Ref NotificationsTable + - DynamoDBStreamReadPolicy: + TableName: !Ref SourceTable + StreamName: !Select [3, !Split ["/", !GetAtt SourceTable.StreamArn]] + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${AWS::StackName}-processor*" + Events: + DDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt SourceTable.StreamArn + StartingPosition: TRIM_HORIZON + BatchSize: 5 + MaximumRetryAttempts: 3 + FunctionResponseTypes: + - ReportBatchItemFailures + +Outputs: + SourceTableName: + Description: Source table — write items here to trigger the pipeline + Value: !Ref SourceTable + + ProcessedTableName: + Description: Table where enriched records are stored + Value: !Ref ProcessedTable + + NotificationsTableName: + Description: Audit trail of notifications + Value: !Ref NotificationsTable + + ProcessorFunctionName: + Description: Name of the durable stream processor + Value: !Ref StreamProcessorFunction