diff --git a/lambda-microvm-dta/.gitignore b/lambda-microvm-dta/.gitignore new file mode 100644 index 000000000..2e0485fc0 --- /dev/null +++ b/lambda-microvm-dta/.gitignore @@ -0,0 +1,10 @@ +.terraform/ +.terraform.lock.hcl +*.tfstate +*.tfstate.* +terraform.tfvars +tfplan +__pycache__/ +*.pyc +*.egg-info/ +out/ diff --git a/lambda-microvm-dta/README.md b/lambda-microvm-dta/README.md new file mode 100644 index 000000000..f2ef643d8 --- /dev/null +++ b/lambda-microvm-dta/README.md @@ -0,0 +1,151 @@ +# CI/CD dynamic threat analysis in AWS Lambda MicroVMs + +This pattern runs an **untrusted artifact** inside an isolated AWS Lambda MicroVM and decides whether to pass or fail a CI/CD pipeline based on **how the artifact actually behaves** — not on what it claims about itself. A sandbox supervisor launches the artifact and a set of collectors observe it from the outside; a small rule engine turns those observations into a deterministic verdict. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/lambda-microvm-dta + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## The idea + +"Did it build and pass unit tests" tells you nothing about what an artifact *does* when it runs — what processes it spawns, what files it writes and executes, what it reads from the environment, where it tries to send data. Commercial **Dynamic Threat Analysis (DTA)** answers that by detonating an artifact in an isolated sandbox and watching its runtime behavior. This pattern demonstrates the same shape as a CI/CD gate, using Lambda MicroVMs as the isolation substrate, while keeping everything safe enough to run in a public sample. + +It is intentionally a **skeleton and a way of thinking**, not a product. It is not Aqua DTA, not a malware classifier, and not an AWS service. + +## Design philosophy + +Five ideas drive the whole design. They are worth understanding before the deployment steps, because they are the point of the sample. + +### 1. The target is untrusted, and it never reports on itself + +The single most important decision: there is a hard separation between the **artifact being analyzed** (the *target*) and the **code doing the analysis** (the *supervisor*). The supervisor launches the target as a child process and a set of collectors observe it entirely from the outside. Nothing in the final report comes from the target describing its own behavior — every conclusion is an external observation. + +This matters because the moment you let an artifact assert its own innocence, you have lost the plot for a threat-analysis tool. A self-reporting runner can be told what to say; a process observed from `/proc`, `strace`, and a filesystem diff cannot lie about the syscalls it made. + +### 2. Observe from outside, with mechanisms that fit the environment + +Collectors watch the target from the outside using primitives that work cleanly inside a MicroVM today: + +| Collector | What it observes | +|---|---| +| `process` (`/proc`) | Process-tree lifecycle: start, child spawns, exit, timeout, nonzero exit. Records env variable *names* only — never values. | +| `filesystem` (diff) | A before/after snapshot diff of the workspace: created / modified / deleted / executable-created files. | +| `canary` | Fake canary files and `DTA_CANARY_*` variables; detects access. High-confidence when `strace` saw the `open`, low-confidence (atime) otherwise. Canary values are never written to the report. | +| `strace` | Wraps the target with `strace -ff` (ptrace, user space) and normalizes process/file/network/privilege syscalls. | +| `network` (`/proc/net`) | Polls `/proc/net/{tcp,udp,...}` for active connections. | + +### 3. Be conservative and explainable — never claim to classify malware + +The rule engine is deliberately simple, and the verdict vocabulary is deliberately modest: `clean`, `suspicious`, `policy_violation`, `unknown`, `error`. **There is no `malware` verdict.** This is a CI gate that turns observed behavior into a pass/fail with an auditable reason, not a detector pretending to be one. The default rules (e.g. "executable created in workspace → suspicious", "canary read → policy_violation", "target spawned a shell → suspicious") are short, readable, and easy to reason about. + +### 4. Know your support boundary, and be honest about it + +The collectors above are in scope precisely because they don't require kernel instrumentation that a MicroVM may not provide. **Tracee, Falco, custom eBPF, and packet capture are explicitly out of scope** for this version — not because they're bad, but because they need feature probes (BTF, capabilities, ring buffers) this sample doesn't perform. The sample would rather under-claim and be accurate than oversell. + +### 5. Safe by default, fail closed + +The default scenarios are benign and canary-based (no real malware, exploits, or secrets). AWS mode is double-gated (`DTA_ALLOW_AWS_MODE=true` **and** `--confirm-sandbox-account`). Every MicroVM run is bounded by a maximum duration, collector failures fail the analysis closed, and the MicroVM is terminated in a `finally` block even when the job fails. + +## How it works + +A MicroVM image carries the sandbox supervisor. For each analysis the orchestrator: + +1. runs a fresh MicroVM from the image (`run-microvm`), bounded by a maximum duration; +2. mints a short-lived endpoint auth token scoped to port 8080 (`create-microvm-auth-token`); +3. calls `POST /analysis/start` on the MicroVM, passing the target config in the request body; +4. the supervisor prepares an isolated workspace, materializes the target, **starts the collectors**, launches the target (argv array, never a shell), then stops the collectors and normalizes everything into `events.jsonl`; +5. the rule engine evaluates the events and produces `report.json` (schema `0.2.0`); +6. the orchestrator downloads the report, applies CI policy, and **terminates the MicroVM**. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. **Use a dedicated sandbox account.** +* [AWS CLI v2 >= 2.35.10](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured. Lambda MicroVMs is GA, but the `aws lambda-microvms` commands are only bundled in AWS CLI v2 >= 2.35.10 (older clients return `Invalid choice: 'lambda-microvms'`). +* A region where AWS Lambda MicroVMs is available. +* [Terraform](https://developer.hashicorp.com/terraform/downloads) installed. +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [Python 3.11+](https://www.python.org/downloads/) for the orchestrator CLI. For the local `dry-run`, also install PyYAML (`pip install pyyaml`); without it the bundled fallback parser cannot read the default rule pack. + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd lambda-microvm-dta + ``` +1. Deploy the bootstrap resources (the artifact bucket and the least-privilege build/execution IAM roles). This requires no input variables: + ``` + terraform init && terraform apply + ``` +1. Note the outputs (`artifact_bucket_name`, `build_role_arn`, `execution_role_arn`); they are used by the orchestrator below. +1. Install the orchestrator CLI (from `src/`) and build the MicroVM image. AWS mode is intentionally double-gated: + ```bash + cd src && pip install -e ./orchestrator && cd .. + export DTA_ALLOW_AWS_MODE=true + + microvm-dta package --source-dir src/microvm --output out/microvm-dta.zip + aws s3 cp out/microvm-dta.zip s3:///artifacts/microvm-dta.zip + + microvm-dta --region build-image --confirm-sandbox-account \ + --image-name lambda-microvm-dta \ + --artifact-uri s3:///artifacts/microvm-dta.zip \ + --base-image-arn arn:aws:lambda::aws:microvm-image:al2023-1 \ + --build-role-arn + microvm-dta --region wait-image --confirm-sandbox-account \ + --image-identifier arn:aws:lambda:::microvm-image:lambda-microvm-dta + ``` + +> The `main.tf` here deploys only the artifact bucket and the build/execution roles, so `terraform apply` needs no inputs. The optional GitHub Actions OIDC CI role (carrying the verified `lambda:*` MicroVM control-plane policy) and the VPC egress / Flow Logs profile are gated behind `enable_github_oidc_role` and `enable_vpc_egress` and are off by default. The IAM action names were verified against the AWS Service Authorization Reference and applied (then destroyed) against GA Lambda MicroVMs during testing. + +## How it works (deployed) + +Because the target is launched and observed by the supervisor running *inside* the MicroVM, it cannot assert its own innocence — every conclusion in `report.json` comes from outside observation. The target configuration travels in the `start-analysis` request body, so you can change the target without rebuilding the image. + +## Testing + +Run a benign target end to end: + +```bash +# Start a MicroVM (lifecycle only) and capture its id +MVM=$(microvm-dta --region run --confirm-sandbox-account \ + --image-identifier arn:aws:lambda:::microvm-image:lambda-microvm-dta \ + --execution-role-arn | jq -r .microvmId) + +# Run the analysis, download the report, and evaluate policy +microvm-dta --region start-analysis --confirm-sandbox-account --microvm-identifier "$MVM" \ + --target-config src/examples/targets/benign-command.yaml +microvm-dta --region fetch-results --confirm-sandbox-account --microvm-identifier "$MVM" --output-dir out +``` + +A benign target produces `summary.status: passed` and `summary.verdict: clean`. A target that spawns `/bin/sh` is flagged `suspicious` (rule R004) once the `strace` collector observes the `execve`. You can also run the entire pipeline locally with no AWS account: + +```bash +microvm-dta dry-run --target-config src/examples/targets/benign-command.yaml --workspace out/analysis +``` + +> Note: the local `dry-run` runs the supervisor on your host, so `strace`-dependent rules (e.g. R004 shell-exec, R002 high-confidence canary) only fire on Linux where `strace` is present — on macOS the `strace` collector fails and those behaviors read as `clean`. Run on a MicroVM (or Linux) to exercise them. + +## Cleanup + +1. Terminate any running MicroVM (the orchestrator also does this in a `finally` block; `cleanup-stale` is a safety net): + ```bash + microvm-dta --region terminate --confirm-sandbox-account --microvm-identifier "$MVM" + microvm-dta --region cleanup-stale --confirm-sandbox-account + ``` +1. Delete the MicroVM image: + ```bash + aws lambda-microvms delete-microvm-image --region \ + --image-identifier arn:aws:lambda:::microvm-image:lambda-microvm-dta + ``` +1. Destroy the Terraform-managed resources: + ```bash + terraform destroy + ``` + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/lambda-microvm-dta/example-pattern.json b/lambda-microvm-dta/example-pattern.json new file mode 100644 index 000000000..5c2108f5a --- /dev/null +++ b/lambda-microvm-dta/example-pattern.json @@ -0,0 +1,59 @@ +{ + "title": "CI/CD dynamic threat analysis in AWS Lambda MicroVMs", + "description": "Run an untrusted artifact inside an isolated Lambda MicroVM and observe its behavior from outside the process to produce a deterministic CI verdict.", + "language": "Python", + "level": "300", + "framework": "Terraform", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates a dynamic-threat-analysis (DTA) style CI/CD gate built on AWS Lambda MicroVMs. A MicroVM image carries a small sandbox supervisor. For each analysis the orchestrator runs a fresh MicroVM, the supervisor launches an untrusted target as a child process (argv only, never a shell), and a set of collectors observe the target entirely from the outside: the process tree via /proc, a before/after filesystem diff, fake canary files and environment variables, syscalls via strace, and network connections via /proc/net.", + "The target never reports on its own behavior. A simple, explainable rule engine turns the observed events into a deterministic verdict (clean, suspicious, policy_violation, unknown, or error) and a machine-readable report, and CI passes or fails on policy. The MicroVM is always terminated, even when the job fails.", + "The Terraform deploys the least-privilege IAM build and execution roles plus an artifact S3 bucket that the orchestrator uses to build the MicroVM image and run the analysis (the same configuration that was applied and destroyed against GA Lambda MicroVMs during testing). The default scenarios are benign and canary-based, so the sample is safe to run in a dedicated sandbox account." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/lambda-microvm-dta", + "templateURL": "serverless-patterns/lambda-microvm-dta", + "projectFolder": "lambda-microvm-dta", + "templateFile": "main.tf" + } + }, + "resources": { + "bullets": [ + { + "text": "AWS Lambda MicroVMs documentation", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/lambda-microvms-guide.html" + }, + { + "text": "Run a MicroVM, create an auth token, and call the endpoint", + "link": "https://docs.aws.amazon.com/lambda/latest/microvm-api/API_RunMicrovm.html" + } + ] + }, + "deploy": { + "text": [ + "terraform init && terraform apply" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Terminate any running MicroVM with microvm-dta ... terminate (the orchestrator also does this in a finally block), then run terraform destroy." + ] + }, + "authors": [ + { + "name": "Your name", + "image": "link-to-your-photo.jpg", + "bio": "Your bio.", + "linkedin": "linked-in-ID", + "twitter": "twitter-handle" + } + ] +} diff --git a/lambda-microvm-dta/main.tf b/lambda-microvm-dta/main.tf new file mode 100644 index 000000000..be24930d1 --- /dev/null +++ b/lambda-microvm-dta/main.tf @@ -0,0 +1,267 @@ +data "aws_caller_identity" "current" {} +data "aws_partition" "current" {} + +locals { + account_id = data.aws_caller_identity.current.account_id + partition = data.aws_partition.current.partition + common_tags = merge({ + Project = var.project_name + ManagedBy = "terraform" + }, var.tags) +} + +# --------------------------------------------------------------------------- # +# Artifact + report bucket +# --------------------------------------------------------------------------- # +resource "aws_s3_bucket" "artifacts" { + bucket = var.artifact_bucket_name != "" ? var.artifact_bucket_name : null + bucket_prefix = var.artifact_bucket_name == "" ? "${var.project_name}-" : null + force_destroy = true + tags = local.common_tags +} + +resource "aws_s3_bucket_public_access_block" "artifacts" { + bucket = aws_s3_bucket.artifacts.id + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "artifacts" { + bucket = aws_s3_bucket.artifacts.id + rule { + apply_server_side_encryption_by_default { + sse_algorithm = "AES256" + } + } +} + +resource "aws_s3_bucket_versioning" "artifacts" { + bucket = aws_s3_bucket.artifacts.id + versioning_configuration { + status = "Enabled" + } +} + +resource "aws_s3_bucket_lifecycle_configuration" "artifacts" { + bucket = aws_s3_bucket.artifacts.id + rule { + id = "expire-artifacts" + status = "Enabled" + filter { prefix = "artifacts/" } + expiration { days = 14 } + } + rule { + id = "expire-reports" + status = "Enabled" + filter { prefix = "reports/" } + expiration { days = 30 } + } +} + +# --------------------------------------------------------------------------- # +# MicroVM build role (assumed by the Lambda MicroVMs service during image build) +# --------------------------------------------------------------------------- # +data "aws_iam_policy_document" "microvm_service_trust" { + statement { + effect = "Allow" + actions = ["sts:AssumeRole", "sts:TagSession"] + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "build" { + name = "${var.project_name}-build-role" + assume_role_policy = data.aws_iam_policy_document.microvm_service_trust.json + tags = local.common_tags +} + +data "aws_iam_policy_document" "build" { + statement { + sid = "ReadArtifact" + effect = "Allow" + actions = ["s3:GetObject"] + resources = ["${aws_s3_bucket.artifacts.arn}/*"] + } + statement { + sid = "BuildLogs" + effect = "Allow" + actions = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"] + resources = ["arn:${local.partition}:logs:${var.region}:${local.account_id}:*"] + } +} + +resource "aws_iam_role_policy" "build" { + name = "microvm-build-policy" + role = aws_iam_role.build.id + policy = data.aws_iam_policy_document.build.json +} + +# --------------------------------------------------------------------------- # +# MicroVM execution role (assumed by the running MicroVM). The sample grants no +# application permissions; an explicit deny is belt-and-suspenders only. +# --------------------------------------------------------------------------- # +resource "aws_iam_role" "execution" { + name = "${var.project_name}-execution-role" + assume_role_policy = data.aws_iam_policy_document.microvm_service_trust.json + tags = local.common_tags +} + +data "aws_iam_policy_document" "execution_deny" { + statement { + sid = "DenySecretsAndParameters" + effect = "Deny" + actions = [ + "secretsmanager:GetSecretValue", + "ssm:GetParameter", + "ssm:GetParameters", + "ssm:GetParametersByPath", + "kms:Decrypt", + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "execution" { + name = "microvm-execution-minimal" + role = aws_iam_role.execution.id + policy = data.aws_iam_policy_document.execution_deny.json +} + +# --------------------------------------------------------------------------- # +# GitHub Actions OIDC CI role +# --------------------------------------------------------------------------- # +resource "aws_iam_openid_connect_provider" "github" { + count = var.enable_github_oidc_role && var.create_github_oidc_provider ? 1 : 0 + url = "https://token.actions.githubusercontent.com" + client_id_list = ["sts.amazonaws.com"] + thumbprint_list = ["6938fd4d98bab03faadb97b34396831e3780aea1"] + tags = local.common_tags +} + +locals { + github_oidc_provider_arn = var.create_github_oidc_provider ? try(aws_iam_openid_connect_provider.github[0].arn, "") : "arn:${local.partition}:iam::${local.account_id}:oidc-provider/token.actions.githubusercontent.com" +} + +data "aws_iam_policy_document" "ci_trust" { + count = var.enable_github_oidc_role ? 1 : 0 + statement { + effect = "Allow" + actions = ["sts:AssumeRoleWithWebIdentity"] + principals { + type = "Federated" + identifiers = [local.github_oidc_provider_arn] + } + condition { + test = "StringEquals" + variable = "token.actions.githubusercontent.com:aud" + values = ["sts.amazonaws.com"] + } + condition { + test = "StringLike" + variable = "token.actions.githubusercontent.com:sub" + values = ["repo:${var.github_org}/${var.github_repo}:ref:refs/heads/${var.github_branch}"] + } + } +} + +resource "aws_iam_role" "ci" { + count = var.enable_github_oidc_role ? 1 : 0 + name = "${var.project_name}-github-actions-role" + assume_role_policy = data.aws_iam_policy_document.ci_trust[0].json + tags = local.common_tags +} + +# IAM actions verified against the AWS Service Authorization Reference for the +# `lambda` service: Lambda MicroVMs actions live under the `lambda:` prefix with +# `Microvm` casing. Run/Create/Update additionally require iam:PassRole and +# lambda:PassNetworkConnector (declared below). +data "aws_iam_policy_document" "ci" { + count = var.enable_github_oidc_role ? 1 : 0 + statement { + sid = "S3ArtifactAccess" + effect = "Allow" + actions = ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket"] + resources = [aws_s3_bucket.artifacts.arn, "${aws_s3_bucket.artifacts.arn}/*"] + } + + statement { + sid = "LambdaMicrovmsControlPlane" + effect = "Allow" + actions = [ + "lambda:CreateMicrovmImage", + "lambda:UpdateMicrovmImage", + "lambda:GetMicrovmImage", + "lambda:GetMicrovmImageBuild", + "lambda:ListMicrovmImages", + "lambda:ListMicrovmImageBuilds", + "lambda:RunMicrovm", + "lambda:GetMicrovm", + "lambda:ListMicrovms", + "lambda:CreateMicrovmAuthToken", + "lambda:SuspendMicrovm", + "lambda:ResumeMicrovm", + "lambda:TerminateMicrovm", + ] + # The control-plane resource type (microvm-image) is scoped where supported; + # list/run operations that the service models as account-level still require "*". + resources = ["*"] + } + + statement { + sid = "PassRolesToMicrovmService" + effect = "Allow" + actions = ["iam:PassRole"] + resources = [aws_iam_role.build.arn, aws_iam_role.execution.arn] + condition { + test = "StringEquals" + variable = "iam:PassedToService" + values = ["lambda.amazonaws.com"] + } + } + + statement { + sid = "PassNetworkConnector" + effect = "Allow" + actions = ["lambda:PassNetworkConnector"] + resources = ["*"] + } + + statement { + sid = "ReadBuildLogs" + effect = "Allow" + actions = ["logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:GetLogEvents", "logs:FilterLogEvents"] + resources = ["arn:${local.partition}:logs:${var.region}:${local.account_id}:*"] + } + + statement { + sid = "ExplicitDenyDangerousAdminActions" + effect = "Deny" + actions = [ + "iam:CreateAccessKey", + "iam:CreateUser", + "iam:CreateRole", + "iam:PutUserPolicy", + "iam:PutRolePolicy", + "iam:AttachUserPolicy", + "iam:AttachRolePolicy", + "iam:UpdateAssumeRolePolicy", + "iam:CreatePolicyVersion", + "organizations:*", + "cloudtrail:StopLogging", + "cloudtrail:DeleteTrail", + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "ci" { + count = var.enable_github_oidc_role ? 1 : 0 + name = "microvm-dta-ci-policy" + role = aws_iam_role.ci[0].id + policy = data.aws_iam_policy_document.ci[0].json +} diff --git a/lambda-microvm-dta/outputs.tf b/lambda-microvm-dta/outputs.tf new file mode 100644 index 000000000..ec36f2e59 --- /dev/null +++ b/lambda-microvm-dta/outputs.tf @@ -0,0 +1,39 @@ +output "artifact_bucket_name" { + description = "S3 bucket for artifacts and reports." + value = aws_s3_bucket.artifacts.bucket +} + +output "build_role_arn" { + description = "Pass to `microvm-dta build-image --build-role-arn`." + value = aws_iam_role.build.arn +} + +output "execution_role_arn" { + description = "Pass to `microvm-dta run --execution-role-arn`." + value = aws_iam_role.execution.arn +} + +output "github_actions_role_arn" { + description = "Optional GitHub Actions OIDC role ARN (only when enable_github_oidc_role = true)." + value = var.enable_github_oidc_role ? aws_iam_role.ci[0].arn : null +} + +output "vpc_id" { + description = "Egress VPC id (when enable_vpc_egress = true)." + value = var.enable_vpc_egress ? aws_vpc.egress[0].id : null +} + +output "egress_security_group_id" { + description = "Restricted egress security group id (when enable_vpc_egress = true)." + value = var.enable_vpc_egress ? aws_security_group.egress[0].id : null +} + +output "private_subnet_id" { + description = "Private subnet id for MicroVM VPC egress (when enable_vpc_egress = true)." + value = var.enable_vpc_egress ? aws_subnet.private[0].id : null +} + +output "flow_logs_log_group" { + description = "CloudWatch log group for VPC Flow Logs (when enabled)." + value = var.enable_vpc_egress && var.enable_vpc_flow_logs ? aws_cloudwatch_log_group.flow_logs[0].name : null +} diff --git a/lambda-microvm-dta/src/examples/targets/benign-command.yaml b/lambda-microvm-dta/src/examples/targets/benign-command.yaml new file mode 100644 index 000000000..0f6239ba2 --- /dev/null +++ b/lambda-microvm-dta/src/examples/targets/benign-command.yaml @@ -0,0 +1,21 @@ +# A benign target: prints a line and writes a small file in the workspace. +target: + type: command + argv: ["python3", "-c", "import pathlib;print('hello from target');pathlib.Path('artifact.txt').write_text('ok')"] + timeoutSeconds: 30 + workingDirectory: "workspace" + environment: {} + +collectors: + process: true + filesystem: true + canary: true + strace: false + network: + procNet: true + vpcFlowLogs: false + +policy: + failOnSeverity: high + failOnPolicyViolation: true + failClosed: false diff --git a/lambda-microvm-dta/src/examples/targets/canary-access.yaml b/lambda-microvm-dta/src/examples/targets/canary-access.yaml new file mode 100644 index 000000000..cd0f23d4a --- /dev/null +++ b/lambda-microvm-dta/src/examples/targets/canary-access.yaml @@ -0,0 +1,25 @@ +# A target that reads the canary files — should trigger R002 (policy_violation). +# Uses strace so canary access is detected with higher confidence than atime. +target: + type: command + argv: + - "python3" + - "-c" + - "import pathlib,glob;[print(open(p).read()[:0]) for p in glob.glob('../canary/*')]" + timeoutSeconds: 30 + workingDirectory: "workspace" + environment: {} + +collectors: + process: true + filesystem: true + canary: true + strace: true + network: + procNet: true + vpcFlowLogs: false + +policy: + failOnSeverity: high + failOnPolicyViolation: true + failClosed: false diff --git a/lambda-microvm-dta/src/examples/targets/sample_target.py b/lambda-microvm-dta/src/examples/targets/sample_target.py new file mode 100644 index 000000000..c23041763 --- /dev/null +++ b/lambda-microvm-dta/src/examples/targets/sample_target.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python3 +"""A benign sample target artifact for the 'script' target type. + +It writes a small file and prints a line — nothing harmful. The supervisor's +collectors observe it from outside. +""" +import pathlib + +pathlib.Path("script_artifact.txt").write_text("written by sample_target\n", encoding="utf-8") +print("sample_target ran") diff --git a/lambda-microvm-dta/src/examples/targets/script-target.yaml b/lambda-microvm-dta/src/examples/targets/script-target.yaml new file mode 100644 index 000000000..8a01f694d --- /dev/null +++ b/lambda-microvm-dta/src/examples/targets/script-target.yaml @@ -0,0 +1,23 @@ +# A script target: the supervisor copies the script into the workspace, marks it +# executable, and runs it. Demonstrates the 'script' target type. +target: + type: script + source: "sample_target.py" + argv: ["python3", "sample_target.py"] + timeoutSeconds: 30 + workingDirectory: "workspace" + environment: {} + +collectors: + process: true + filesystem: true + canary: true + strace: false + network: + procNet: true + vpcFlowLogs: false + +policy: + failOnSeverity: high + failOnPolicyViolation: true + failClosed: false diff --git a/lambda-microvm-dta/src/examples/targets/shell-exec.yaml b/lambda-microvm-dta/src/examples/targets/shell-exec.yaml new file mode 100644 index 000000000..ca28f22b0 --- /dev/null +++ b/lambda-microvm-dta/src/examples/targets/shell-exec.yaml @@ -0,0 +1,23 @@ +# A target that spawns /bin/sh — should match R004 (suspicious, not a failure +# unless severity threshold is lowered). Demonstrates argv-only execution where +# the target itself chooses to exec a shell. +target: + type: command + argv: ["python3", "-c", "import subprocess;subprocess.run(['/bin/sh','-c','echo hi'])"] + timeoutSeconds: 30 + workingDirectory: "workspace" + environment: {} + +collectors: + process: true + filesystem: true + canary: true + strace: true + network: + procNet: true + vpcFlowLogs: false + +policy: + failOnSeverity: high + failOnPolicyViolation: true + failClosed: false diff --git a/lambda-microvm-dta/src/microvm/Dockerfile b/lambda-microvm-dta/src/microvm/Dockerfile new file mode 100644 index 000000000..d4784d0fa --- /dev/null +++ b/lambda-microvm-dta/src/microvm/Dockerfile @@ -0,0 +1,24 @@ +# Lambda MicroVM application image. The MicroVM operating-system base image is +# selected separately by the Lambda MicroVMs create/update API via --base-image-arn. +# strace provides the user-space syscall collector (no eBPF/Tracee/Falco in v0.2). +FROM python:3.13-slim + +WORKDIR /app + +RUN apt-get update \ + && apt-get install -y --no-install-recommends strace \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt ./requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY app ./app +COPY rules ./rules + +ENV DTA_ANALYSIS_ROOT=/analysis +ENV DTA_RULES_FILE=/app/rules/default.yaml +ENV DTA_SOURCE_BASE=/app +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8080 +CMD ["python", "-m", "app.server"] diff --git a/lambda-microvm-dta/src/microvm/app/__init__.py b/lambda-microvm-dta/src/microvm/app/__init__.py new file mode 100644 index 000000000..3dc1f76bc --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/lambda-microvm-dta/src/microvm/app/collectors/__init__.py b/lambda-microvm-dta/src/microvm/app/collectors/__init__.py new file mode 100644 index 000000000..22aca0392 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/__init__.py @@ -0,0 +1,11 @@ +"""Collectors observe the target from outside the target process. + +Each collector implements the :class:`Collector` protocol and emits normalized +events into a shared :class:`~app.events.EventSink`. Collectors are started +before the target launches and stopped after it terminates. +""" +from __future__ import annotations + +from .base import Collector, CollectorStatus + +__all__ = ["Collector", "CollectorStatus"] diff --git a/lambda-microvm-dta/src/microvm/app/collectors/base.py b/lambda-microvm-dta/src/microvm/app/collectors/base.py new file mode 100644 index 000000000..111e270f8 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/base.py @@ -0,0 +1,62 @@ +"""Collector interface shared by all v0.2 collectors.""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional + +from ..events import EventSink +from ..workspace import Workspace + + +@dataclass +class CollectorStatus: + name: str + enabled: bool + status: str = "pending" # pending | running | completed | failed | skipped + errors: List[str] = field(default_factory=list) + + def to_dict(self) -> dict: + return { + "name": self.name, + "enabled": self.enabled, + "status": self.status, + "errors": list(self.errors), + } + + +class Collector: + """Lifecycle: ``setup`` (pre-launch) → target runs → ``finish`` (post-exit). + + A collector that needs to observe a live PID implements + :meth:`on_target_started`, which the supervisor calls from the launcher's + ``on_started`` hook. Concurrency-heavy collectors (pollers) spawn their own + thread in ``on_target_started`` and join it in ``finish``. + """ + + name = "collector" + + def __init__(self, sink: EventSink, workspace: Workspace) -> None: + self.sink = sink + self.workspace = workspace + self.status = CollectorStatus(name=self.name, enabled=True) + + # --- lifecycle hooks (override as needed) --- + def setup(self) -> None: # before target launch + self.status.status = "running" + + def on_target_started(self, pid: int) -> None: # target is live + pass + + def finish(self, launch_result) -> None: # after target exit + if self.status.status == "running": + self.status.status = "completed" + + # --- helpers --- + def fail(self, message: str) -> None: + self.status.status = "failed" + self.status.errors.append(message) + + @property + def strace_wrapper(self) -> Optional[List[str]]: + """Collectors that need to wrap the target argv (strace) return one here.""" + return None diff --git a/lambda-microvm-dta/src/microvm/app/collectors/canary.py b/lambda-microvm-dta/src/microvm/app/collectors/canary.py new file mode 100644 index 000000000..512548e4f --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/canary.py @@ -0,0 +1,135 @@ +"""Canary collector: presents fake sensitive assets and detects access. + +Creates fake canary files (including an AWS-credential-shaped file) and canary +environment variables before the target runs, then infers access afterward. +File-access detection here is metadata-based (atime / mtime) and therefore +**low confidence** — the strace collector provides higher-confidence +``open``/``openat`` evidence when enabled. Canary *values* are fake and are +never written to events; only identifiers and paths are reported. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Dict, List +import os + +from .base import Collector + +# Canonical fake canary material. Values are deliberately obvious placeholders. +_CANARY_FILE_NAME = "CANARY_SECRET_DO_NOT_USE.txt" +_CANARY_FILE_BODY = "DTA_CANARY_SECRET=not-a-real-secret\n" +_CANARY_AWS_NAME = "aws_credentials_canary" +_CANARY_AWS_BODY = ( + "[default]\n" + "aws_access_key_id = AKIAIOSFODNN7EXAMPLE\n" # AWS's own documentation example key + "aws_secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY\n" +) +_CANARY_ENV = {"DTA_CANARY_ENV": "not-a-real-secret"} + + +class CanaryCollector(Collector): + name = "canary" + + def __init__(self, sink, workspace) -> None: + super().__init__(sink, workspace) + self._files: List[Path] = [] + self._atime_before: Dict[str, int] = {} + + def setup(self) -> None: + super().setup() + secret = self.workspace.canary / _CANARY_FILE_NAME + secret.write_text(_CANARY_FILE_BODY, encoding="utf-8") + aws_cred = self.workspace.canary / _CANARY_AWS_NAME + aws_cred.write_text(_CANARY_AWS_BODY, encoding="utf-8") + self._files = [secret, aws_cred] + + for f in self._files: + self.sink.record( + "canary.file_presented", severity="none", source="canary", + message="Presented a fake canary file", + canary={"id": f.name, "path": str(f)}, + ) + try: + self._atime_before[str(f)] = f.stat().st_atime_ns + except OSError: + pass + + for key in _CANARY_ENV: + self.sink.record( + "canary.env_presented", severity="none", source="canary", + message="Presented a fake canary environment variable", + canary={"id": key}, + ) + + @property + def canary_env(self) -> Dict[str, str]: + """Canary env vars to inject into the target (called by the supervisor).""" + return dict(_CANARY_ENV) + + def finish(self, launch_result) -> None: + # The target ran with the canary env in scope. + for key in _CANARY_ENV: + self.sink.record( + "canary.env_exposed_to_target", severity="low", source="canary", + message="Target ran with a canary environment variable in scope", + canary={"id": key}, + ) + + # High-confidence detection: did strace observe an open of a canary path? + # (The supervisor orders the strace collector before canary so its + # syscall events are already in the sink.) + strace_opened = self._strace_opened_paths() + + for f in self._files: + fkey = str(f) + high_confidence = fkey in strace_opened + if high_confidence: + event_type = ( + "canary.aws_credential_path_touched" + if f.name == _CANARY_AWS_NAME + else "canary.file_read_observed" + ) + self.sink.record( + event_type, severity="high", source="canary", + message="Canary file read confirmed by syscall evidence", + canary={"id": f.name, "path": fkey, "confidence": "high", "evidence": "strace.open"}, + ) + continue + # Low-confidence fallback: atime advanced. This is informational only + # (suspected) and does not by itself constitute a policy violation. + if self._atime_advanced(f): + self.sink.record( + "canary.file_touched_suspected", severity="low", source="canary", + message="Canary file access suspected (low-confidence atime evidence)", + canary={"id": f.name, "path": fkey, "confidence": "low", "evidence": "atime"}, + ) + super().finish(launch_result) + + def _atime_advanced(self, f: Path) -> bool: + try: + atime = f.stat().st_atime_ns + except OSError: + return False + before = self._atime_before.get(str(f)) + return before is not None and atime > before + + def _strace_opened_paths(self) -> set[str]: + opened: set[str] = set() + canary_dir = str(self.workspace.canary.resolve()) + for event in self.sink.events(): + if event.type != "syscall.file_open" or not event.file: + continue + path = str(event.file.get("path") or "") + if not path: + continue + resolved = self._resolve_under_canary(path, canary_dir) + if resolved: + opened.add(resolved) + return opened + + def _resolve_under_canary(self, path: str, canary_dir: str) -> str | None: + # Match canary files by basename to tolerate relative paths in strace output. + for f in self._files: + if Path(path).name == f.name: + return str(f) + return None diff --git a/lambda-microvm-dta/src/microvm/app/collectors/filesystem.py b/lambda-microvm-dta/src/microvm/app/collectors/filesystem.py new file mode 100644 index 000000000..4906f1c36 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/filesystem.py @@ -0,0 +1,150 @@ +"""Filesystem diff collector. + +Snapshots the monitored workspace directories before and after target execution +and reports created / modified / deleted files and executable-permission +changes. Scope is intentionally limited to the analysis workspace, canary, and +tmp dirs — it never scans the whole MicroVM filesystem in v0.2. +""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Optional +import hashlib +import stat + +from .base import Collector + +# Only hash files at or below this size to keep snapshots cheap. +_HASH_SIZE_LIMIT = 1_048_576 # 1 MiB + + +@dataclass(frozen=True) +class _FileState: + size: int + mtime_ns: int + mode: int + sha256: Optional[str] + + @property + def is_executable(self) -> bool: + return bool(self.mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)) + + +def _sha256(path: Path) -> Optional[str]: + try: + if path.stat().st_size > _HASH_SIZE_LIMIT: + return None + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + h.update(chunk) + return h.hexdigest() + except (OSError, ValueError): + return None + + +def _snapshot(roots, *, no_hash_roots=()) -> Dict[str, _FileState]: + """Snapshot files under ``roots``. + + Files under any directory in ``no_hash_roots`` are stat-only (no content + read), so snapshotting does not advance their access time. This matters for + the canary directory, whose atime is used as a (low-confidence) read signal. + """ + no_hash = [Path(r).resolve() for r in no_hash_roots] + snap: Dict[str, _FileState] = {} + for root in roots: + root = Path(root) + if not root.exists(): + continue + for path in root.rglob("*"): + if not path.is_file() or path.is_symlink(): + continue + try: + st = path.stat() + except OSError: + continue + skip_hash = any(_under(path, nh) for nh in no_hash) + snap[str(path)] = _FileState( + size=st.st_size, + mtime_ns=st.st_mtime_ns, + mode=st.st_mode, + sha256=None if skip_hash else _sha256(path), + ) + return snap + + +def _under(child: Path, parent: Path) -> bool: + try: + child.resolve().relative_to(parent) + return True + except ValueError: + return False + + +def _is_temp_like(path: str, tmp_root: Path) -> bool: + try: + Path(path).resolve().relative_to(tmp_root.resolve()) + return True + except ValueError: + return False + + +class FilesystemCollector(Collector): + name = "filesystem" + + def __init__(self, sink, workspace) -> None: + super().__init__(sink, workspace) + self._before: Dict[str, _FileState] = {} + + def setup(self) -> None: + super().setup() + self._before = _snapshot(self.workspace.monitored_dirs, no_hash_roots=[self.workspace.canary]) + + def finish(self, launch_result) -> None: + after = _snapshot(self.workspace.monitored_dirs, no_hash_roots=[self.workspace.canary]) + before = self._before + tmp_root = self.workspace.tmp + + for path, state in after.items(): + if path not in before: + self.sink.record( + "file.created", severity="low", source="filesystem", + message="File created by target", + file={"path": path, "size": state.size, "sha256": state.sha256}, + ) + if state.is_executable: + self.sink.record( + "file.executable_created", severity="medium", source="filesystem", + message="Executable file created by target", + file={"path": path, "sha256": state.sha256}, + ) + if _is_temp_like(path, tmp_root) and state.is_executable: + self.sink.record( + "file.temp_executable", severity="medium", source="filesystem", + message="Executable created in a temp-like directory", + file={"path": path}, + ) + else: + prev = before[path] + if prev.sha256 != state.sha256 or prev.size != state.size or prev.mtime_ns != state.mtime_ns: + self.sink.record( + "file.modified", severity="low", source="filesystem", + message="File modified by target", + file={"path": path, "size": state.size, "sha256": state.sha256}, + ) + if (not prev.is_executable) and state.is_executable: + self.sink.record( + "file.executable_permission_added", severity="medium", source="filesystem", + message="Executable permission added to a file", + file={"path": path}, + ) + + for path in before: + if path not in after: + self.sink.record( + "file.deleted", severity="low", source="filesystem", + message="File deleted by target", + file={"path": path}, + ) + super().finish(launch_result) diff --git a/lambda-microvm-dta/src/microvm/app/collectors/network.py b/lambda-microvm-dta/src/microvm/app/collectors/network.py new file mode 100644 index 000000000..0e6a305a8 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/network.py @@ -0,0 +1,102 @@ +"""Basic network collector using ``/proc/net`` snapshots. + +Polls ``/proc/net/{tcp,tcp6,udp,udp6}`` while the target runs and reports +observed connections. This is host-metadata evidence — it sees sockets owned by +the network namespace, not just the target — so it is corroborating rather than +authoritative. The strace collector provides per-process ``connect``/``socket`` +evidence, and (in AWS mode) VPC Flow Logs add the AWS-side view. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Optional, Set, Tuple +import threading +import time + +from .base import Collector + +_PROC_NET_FILES = { + "tcp": Path("/proc/net/tcp"), + "tcp6": Path("/proc/net/tcp6"), + "udp": Path("/proc/net/udp"), + "udp6": Path("/proc/net/udp6"), +} + +# TCP state 01 == ESTABLISHED in /proc/net/tcp. +_TCP_ESTABLISHED = "01" + + +def _parse_hex_ipv4(hexip: str) -> str: + # /proc stores the address little-endian. + b = bytes.fromhex(hexip) + return ".".join(str(x) for x in reversed(b)) + + +def _parse_addr(field: str, ipv6: bool) -> Tuple[str, int]: + addr, _, port = field.partition(":") + port_num = int(port, 16) if port else 0 + if ipv6: + return (addr.lower(), port_num) # keep raw hex for v6 to stay dependency-light + return (_parse_hex_ipv4(addr), port_num) + + +class NetworkCollector(Collector): + name = "network" + + def __init__(self, sink, workspace, poll_interval: float = 0.25) -> None: + super().__init__(sink, workspace) + self._poll_interval = poll_interval + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._seen: Set[Tuple[str, str, int]] = set() + self._observed_any = False + + def on_target_started(self, pid: int) -> None: + if not any(p.exists() for p in _PROC_NET_FILES.values()): + return # not Linux / no procfs: stay silent, finish() notes no egress + self._thread = threading.Thread(target=self._poll_loop, daemon=True) + self._thread.start() + + def _poll_loop(self) -> None: + while not self._stop.is_set(): + for proto, path in _PROC_NET_FILES.items(): + self._scan(proto, path) + time.sleep(self._poll_interval) + + def _scan(self, proto: str, path: Path) -> None: + try: + lines = path.read_text(encoding="utf-8", errors="replace").splitlines()[1:] + except (FileNotFoundError, PermissionError, OSError): + return + ipv6 = proto.endswith("6") + for line in lines: + cols = line.split() + if len(cols) < 4: + continue + rem_ip, rem_port = _parse_addr(cols[2], ipv6) + state = cols[3] + if rem_port == 0: + continue # listening / unconnected + if proto.startswith("tcp") and state != _TCP_ESTABLISHED: + continue + key = (proto, rem_ip, rem_port) + if key in self._seen: + continue + self._seen.add(key) + self._observed_any = True + self.sink.record( + "network.procnet_connection_observed", severity="low", source="network", + message="Observed an active connection via /proc/net", + network={"protocol": proto, "destinationIp": rem_ip, "destinationPort": rem_port}, + ) + + def finish(self, launch_result) -> None: + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=2) + if not self._observed_any: + self.sink.record( + "network.no_egress_observed", severity="none", source="network", + message="No outbound connections observed via /proc/net", + ) + super().finish(launch_result) diff --git a/lambda-microvm-dta/src/microvm/app/collectors/process.py b/lambda-microvm-dta/src/microvm/app/collectors/process.py new file mode 100644 index 000000000..754219476 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/process.py @@ -0,0 +1,136 @@ +"""Process collector: observes the target process tree from outside. + +Polls ``/proc/`` for the root PID and any observed children while the +target runs, then records terminal events (exit / timeout / nonzero) from the +launch result. Environment values are never recorded (only variable names), so +canary or injected values cannot leak into the report. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Dict, Optional, Set +import threading +import time + +from .base import Collector + + +def _read_proc_text(pid: int, name: str) -> Optional[str]: + try: + return Path(f"/proc/{pid}/{name}").read_text(encoding="utf-8", errors="replace") + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + return None + + +def _read_cmdline(pid: int) -> Optional[list[str]]: + raw = _read_proc_text(pid, "cmdline") + if raw is None: + return None + parts = [p for p in raw.split("\x00") if p] + return parts or None + + +def _read_exe(pid: int) -> Optional[str]: + try: + return str(Path(f"/proc/{pid}/exe").resolve()) + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + return None + + +def _read_children(pid: int) -> Set[int]: + """Best-effort child discovery via /proc//task/*/children.""" + children: Set[int] = set() + task_dir = Path(f"/proc/{pid}/task") + try: + for task in task_dir.iterdir(): + raw = _read_proc_text_path(task / "children") + if raw: + children.update(int(x) for x in raw.split()) + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + pass + return children + + +def _read_proc_text_path(path: Path) -> Optional[str]: + try: + return path.read_text(encoding="utf-8", errors="replace") + except (FileNotFoundError, ProcessLookupError, PermissionError, OSError): + return None + + +class ProcessCollector(Collector): + name = "process" + + def __init__(self, sink, workspace, poll_interval: float = 0.2) -> None: + super().__init__(sink, workspace) + self._poll_interval = poll_interval + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._root_pid: Optional[int] = None + self._seen_children: Set[int] = set() + + def on_target_started(self, pid: int) -> None: + self._root_pid = pid + argv = _read_cmdline(pid) + self.sink.record( + "process.start", + severity="none", + source="process", + message="Target process started", + pid=pid, + process={"argv": argv, "exe": _read_exe(pid)}, + ) + self._thread = threading.Thread(target=self._poll_loop, args=(pid,), daemon=True) + self._thread.start() + + def _poll_loop(self, root_pid: int) -> None: + # /proc is Linux-only; on other platforms this simply observes nothing. + proc_available = Path("/proc").is_dir() + while not self._stop.is_set(): + if proc_available: + for child in _read_children(root_pid): + if child not in self._seen_children: + self._seen_children.add(child) + self.sink.record( + "process.child_observed", + severity="low", + source="process", + message="Observed a child process spawned by the target", + pid=child, + process={"argv": _read_cmdline(child), "exe": _read_exe(child)}, + ) + time.sleep(self._poll_interval) + + def finish(self, launch_result) -> None: + self._stop.set() + if self._thread is not None: + self._thread.join(timeout=2) + + lr = launch_result + self.sink.record( + "process.stdout_captured", source="process", + message="Captured target stdout", data={"path": str(lr.stdout_path)}, + ) + self.sink.record( + "process.stderr_captured", source="process", + message="Captured target stderr", data={"path": str(lr.stderr_path)}, + ) + if lr.timed_out: + self.sink.record( + "process.timeout", severity="medium", source="process", + message="Target exceeded its timeout and was terminated", + pid=lr.pid, data={"durationSeconds": lr.duration_seconds}, + ) + self.sink.record( + "process.exit", severity="none", source="process", + message="Target process exited", + pid=lr.pid, + data={"exitCode": lr.exit_code, "durationSeconds": lr.duration_seconds, "timedOut": lr.timed_out}, + ) + if not lr.timed_out and lr.exit_code not in (0, None): + self.sink.record( + "process.nonzero_exit", severity="low", source="process", + message="Target exited with a non-zero status", + pid=lr.pid, data={"exitCode": lr.exit_code}, + ) + super().finish(launch_result) diff --git a/lambda-microvm-dta/src/microvm/app/collectors/strace_collector.py b/lambda-microvm-dta/src/microvm/app/collectors/strace_collector.py new file mode 100644 index 000000000..3f4813eeb --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/collectors/strace_collector.py @@ -0,0 +1,223 @@ +"""strace syscall collector. + +Wraps the target argv with ``strace -ff`` so the tracer is the parent and the +target is its child — user-space syscall tracing via ptrace, no eBPF/Tracee/ +Falco required. After the target exits, the per-PID strace logs are parsed into +normalized syscall events and kept as raw artifacts. + +Limitations (documented in the support boundary): strace adds runtime overhead, +can miss anti-debugging targets, and is not a high-performance production +sensor. It is appropriate for CI DTA-style analysis where the supervisor +launches the target. +""" +from __future__ import annotations + +from pathlib import Path +from typing import Dict, List, Optional +import re +import shutil + +from .base import Collector + +_STRACE_CLASSES = "process,file,network,signal" + +# Syscalls we normalize. Mapping: syscall name -> (event type, severity). +_SYSCALL_EVENTS: Dict[str, tuple[str, str]] = { + "execve": ("syscall.execve", "low"), + "execveat": ("syscall.execve", "low"), + "clone": ("syscall.clone", "low"), + "clone3": ("syscall.clone", "low"), + "fork": ("syscall.clone", "low"), + "vfork": ("syscall.clone", "low"), + "open": ("syscall.file_open", "none"), + "openat": ("syscall.file_open", "none"), + "creat": ("syscall.file_open", "low"), + "unlink": ("syscall.file_delete", "low"), + "unlinkat": ("syscall.file_delete", "low"), + "rename": ("syscall.file_write_path", "low"), + "renameat": ("syscall.file_write_path", "low"), + "renameat2": ("syscall.file_write_path", "low"), + "chmod": ("syscall.file_permission_change", "low"), + "fchmod": ("syscall.file_permission_change", "low"), + "fchmodat": ("syscall.file_permission_change", "low"), + "chown": ("syscall.file_permission_change", "low"), + "fchown": ("syscall.file_permission_change", "low"), + "fchownat": ("syscall.file_permission_change", "low"), + "socket": ("syscall.socket", "low"), + "connect": ("syscall.connect", "medium"), + "bind": ("syscall.bind", "low"), + "listen": ("syscall.bind", "low"), + "accept": ("syscall.socket", "none"), + "accept4": ("syscall.socket", "none"), + "sendto": ("syscall.connect", "low"), + "recvfrom": ("syscall.socket", "none"), + "mprotect": ("syscall.mprotect", "low"), + "setuid": ("syscall.privilege_change_attempt", "high"), + "setgid": ("syscall.privilege_change_attempt", "high"), + "setreuid": ("syscall.privilege_change_attempt", "high"), + "setregid": ("syscall.privilege_change_attempt", "high"), + "capset": ("syscall.capability_change_attempt", "high"), +} + +# strace line: "[pid 1234] 1700000000.123 execve("/bin/sh", ["sh"], ...) = 0" +# With -ff and -o FILE.PID, the [pid N] prefix may be absent; PID comes from the +# filename suffix. We capture optional pid, optional timestamp, syscall, args. +_LINE_RE = re.compile( + r"^(?:\[pid\s+(?P\d+)\]\s+)?" + r"(?P\d+\.\d+\s+)?" + r"(?P[a-zA-Z0-9_]+)\((?P.*?)\)\s*=\s*(?P-?\d+|\?|0x[0-9a-fA-F]+).*$" +) + +_CONNECT_ADDR_RE = re.compile(r'sin_addr=inet_addr\("(?P[0-9.]+)"\)') +_CONNECT_PORT_RE = re.compile(r"sin_port=htons\((?P\d+)\)") +_CONNECT6_ADDR_RE = re.compile(r'inet_pton\([^,]+,\s*"(?P[0-9a-fA-F:]+)"') +_FIRST_STR_ARG_RE = re.compile(r'"((?:[^"\\]|\\.)*)"') + + +class StraceCollector(Collector): + name = "strace" + + def __init__(self, sink, workspace, fail_closed: bool = True) -> None: + super().__init__(sink, workspace) + self._fail_closed = fail_closed + self._trace_dir = workspace.artifacts / "strace" + self._strace_path: Optional[str] = None + + @staticmethod + def is_available() -> bool: + return shutil.which("strace") is not None + + def setup(self) -> None: + super().setup() + self._trace_dir.mkdir(parents=True, exist_ok=True) + self._strace_path = shutil.which("strace") + if self._strace_path is None: + self.fail("strace binary not found on PATH") + self.sink.record( + "collector.strace_failed", severity="medium", source="strace", + message="strace is not available; syscall evidence will be missing", + ) + return + self.sink.record("collector.strace_started", source="strace", message="strace collector armed") + + @property + def strace_wrapper(self) -> Optional[List[str]]: + if self._strace_path is None: + return None + trace_prefix = str(self._trace_dir / "trace") + return [ + self._strace_path, + "-ff", + "-ttt", + "-s", "256", + "-o", trace_prefix, + "-e", f"trace={_STRACE_CLASSES}", + "--", + ] + + @property + def available(self) -> bool: + return self._strace_path is not None + + def finish(self, launch_result) -> None: + if self._strace_path is None: + # already recorded failure in setup + if self.status.status != "failed": + self.fail("strace unavailable") + super().finish(launch_result) + return + trace_files = sorted(self._trace_dir.glob("trace.*")) + if not trace_files: + self.fail("strace produced no trace files") + self.sink.record( + "collector.strace_failed", severity="medium", source="strace", + message="strace produced no output", + ) + super().finish(launch_result) + return + total = 0 + for tf in trace_files: + pid = self._pid_from_filename(tf) + total += self._parse_file(tf, pid) + self.sink.record( + "collector.strace_completed", source="strace", + message="strace parsing complete", + data={"files": len(trace_files), "events": total}, + ) + super().finish(launch_result) + + @staticmethod + def _pid_from_filename(path: Path) -> Optional[int]: + suffix = path.name.rsplit(".", 1)[-1] + return int(suffix) if suffix.isdigit() else None + + def _parse_file(self, path: Path, pid: Optional[int]) -> int: + count = 0 + try: + lines = path.read_text(encoding="utf-8", errors="replace").splitlines() + except OSError as exc: + self.status.errors.append(f"could not read {path}: {exc}") + return 0 + for line in lines: + m = _LINE_RE.match(line) + if not m: + continue + call = m.group("call") + mapping = _SYSCALL_EVENTS.get(call) + if mapping is None: + continue + event_type, severity = mapping + args = m.group("args") + line_pid = int(m.group("pid")) if m.group("pid") else pid + self._emit(event_type, severity, call, args, line_pid) + count += 1 + return count + + def _emit(self, event_type: str, severity: str, call: str, args: str, pid: Optional[int]) -> None: + process: Dict[str, object] = {"syscall": call} + network: Optional[Dict[str, object]] = None + file_info: Optional[Dict[str, object]] = None + + if call in ("execve", "execveat"): + argv = self._parse_execve_argv(args) + if argv: + process["argv"] = argv + elif call in ("open", "openat", "creat", "unlink", "unlinkat", "chmod", "chown"): + sm = _FIRST_STR_ARG_RE.search(args) + # openat's first string is often the dirfd symbol; take the last path-looking string + paths = _FIRST_STR_ARG_RE.findall(args) + path_val = paths[-1] if paths else (sm.group(1) if sm else None) + if path_val: + file_info = {"path": path_val} + elif call in ("connect", "sendto"): + network = self._parse_connect(args) + + self.sink.record( + event_type, severity=severity, source="strace", + message=f"strace observed {call}()", + pid=pid, process=process, network=network, file=file_info, + ) + + @staticmethod + def _parse_execve_argv(args: str) -> Optional[List[str]]: + # execve("/path", ["a", "b"], 0x...) -> capture the bracketed list + start = args.find("[") + end = args.find("]", start) + if start == -1 or end == -1: + return None + inner = args[start + 1 : end] + return [m.group(1) for m in _FIRST_STR_ARG_RE.finditer(inner)] or None + + @staticmethod + def _parse_connect(args: str) -> Optional[Dict[str, object]]: + ip_m = _CONNECT_ADDR_RE.search(args) + port_m = _CONNECT_PORT_RE.search(args) + ip6_m = _CONNECT6_ADDR_RE.search(args) + info: Dict[str, object] = {} + if ip_m: + info["destinationIp"] = ip_m.group("ip") + elif ip6_m: + info["destinationIp"] = ip6_m.group("ip") + if port_m: + info["destinationPort"] = int(port_m.group("port")) + return info or None diff --git a/lambda-microvm-dta/src/microvm/app/events.py b/lambda-microvm-dta/src/microvm/app/events.py new file mode 100644 index 000000000..b1d156fa3 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/events.py @@ -0,0 +1,126 @@ +"""Normalized event model for the v0.2 DTA supervisor. + +Collectors observe the target from *outside* the target process and emit +normalized events. Events are written to ``events.jsonl`` (one JSON object per +line) and consumed by the rule engine. This module deliberately uses only the +standard library. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional +import json +import threading + +SEVERITY_ORDER = { + "none": 0, + "informational": 0, + "low": 1, + "medium": 2, + "high": 3, + "critical": 4, +} + + +def utc_now() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def severity_gte(left: str, right: str) -> bool: + return SEVERITY_ORDER.get(left, -1) >= SEVERITY_ORDER.get(right, -1) + + +def max_severity(severities: Iterable[str]) -> str: + best = "none" + for item in severities: + if severity_gte(item, best): + best = item + return best + + +@dataclass +class Event: + """A single normalized observation about the target. + + Only non-empty optional sections are serialized so ``events.jsonl`` stays + compact and stable. + """ + + type: str + severity: str = "none" + source: str = "supervisor" + message: str = "" + timestamp: str = field(default_factory=utc_now) + pid: Optional[int] = None + process: Optional[Dict[str, Any]] = None + file: Optional[Dict[str, Any]] = None + network: Optional[Dict[str, Any]] = None + canary: Optional[Dict[str, Any]] = None + data: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + out: Dict[str, Any] = { + "timestamp": self.timestamp, + "type": self.type, + "severity": self.severity, + "source": self.source, + } + if self.pid is not None: + out["pid"] = self.pid + if self.process: + out["process"] = self.process + if self.file: + out["file"] = self.file + if self.network: + out["network"] = self.network + if self.canary: + out["canary"] = self.canary + if self.data: + out["data"] = self.data + if self.message: + out["message"] = self.message + return out + + +class EventSink: + """Thread-safe collector for normalized events. + + Collectors may run concurrently (e.g. the process poller while strace + writes), so appends are guarded by a lock. Events are buffered in memory and + flushed to ``events.jsonl`` once via :meth:`write_jsonl`. + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._events: List[Event] = [] + + def emit(self, event: Event) -> None: + with self._lock: + self._events.append(event) + + def record(self, type: str, severity: str = "none", source: str = "supervisor", message: str = "", **sections: Any) -> Event: + """Convenience for building and emitting an event. + + Known section kwargs (``pid``, ``process``, ``file``, ``network``, + ``canary``) map onto the event; anything else lands in ``data``. + """ + known = {k: sections.pop(k) for k in ("pid", "process", "file", "network", "canary") if k in sections} + event = Event(type=type, severity=severity, source=source, message=message, data=dict(sections), **known) + self.emit(event) + return event + + def events(self) -> List[Event]: + with self._lock: + return list(self._events) + + def sorted_events(self) -> List[Event]: + return sorted(self.events(), key=lambda e: e.timestamp) + + def write_jsonl(self, path: str | Path) -> Path: + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + lines = [json.dumps(e.to_dict(), sort_keys=True) for e in self.sorted_events()] + out.write_text("\n".join(lines) + ("\n" if lines else ""), encoding="utf-8") + return out diff --git a/lambda-microvm-dta/src/microvm/app/launcher.py b/lambda-microvm-dta/src/microvm/app/launcher.py new file mode 100644 index 000000000..32c2dd6b3 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/launcher.py @@ -0,0 +1,154 @@ +"""Target launcher: runs the untrusted target as a child process. + +The launcher is the boundary between observation and execution. It starts the +target with a fixed argv (never a shell), enforces a timeout, captures +stdout/stderr to artifact files, and returns structured metadata. Collectors +observe the process *through* this launcher (it exposes the Popen handle) rather +than the target reporting on itself. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Optional +import os +import signal +import subprocess +import time + + +@dataclass +class LaunchResult: + argv: List[str] + pid: Optional[int] + exit_code: Optional[int] + timed_out: bool + started_at_monotonic: float + duration_seconds: float + stdout_path: Path + stderr_path: Path + started_at: str + ended_at: str + error: Optional[str] = None + extra: Dict[str, object] = field(default_factory=dict) + + +def build_child_env(base_environment: Dict[str, str], canary_env: Dict[str, str]) -> Dict[str, str]: + """Construct the target's environment. + + Starts from a minimal copy of the supervisor environment (PATH, HOME, etc.), + overlays declared target environment, then canary variables. Real secrets + must never be placed here by callers. + """ + safe_keys = ("PATH", "HOME", "LANG", "LC_ALL", "TZ", "TMPDIR") + env: Dict[str, str] = {k: os.environ[k] for k in safe_keys if k in os.environ} + env.update(base_environment) + env.update(canary_env) + return env + + +def launch_target( + argv: List[str], + *, + cwd: Path, + env: Dict[str, str], + timeout_seconds: float, + stdout_path: Path, + stderr_path: Path, + wrapper: Optional[List[str]] = None, + on_started=None, +) -> LaunchResult: + """Run ``argv`` under ``cwd`` with a timeout, capturing output to files. + + ``wrapper`` lets a collector prepend a tracer (e.g. ``["strace", "-ff", ..., + "--"]``); the wrapper + argv are exec'd together so the tracer is the parent + and the target is its child. ``on_started(pid)`` is invoked once the child + is running so a poller-based collector can begin observing immediately. + """ + from .events import utc_now + + full_argv = list(wrapper or []) + list(argv) + stdout_path.parent.mkdir(parents=True, exist_ok=True) + stderr_path.parent.mkdir(parents=True, exist_ok=True) + + started_at = utc_now() + start_mono = time.monotonic() + timed_out = False + error: Optional[str] = None + exit_code: Optional[int] = None + pid: Optional[int] = None + + with open(stdout_path, "wb") as out, open(stderr_path, "wb") as err: + try: + proc = subprocess.Popen( # nosec B603 - argv list, no shell + full_argv, + cwd=str(cwd), + env=env, + stdout=out, + stderr=err, + stdin=subprocess.DEVNULL, + start_new_session=True, # own process group so we can kill the tree + ) + except FileNotFoundError as exc: + ended_at = utc_now() + return LaunchResult( + argv=full_argv, pid=None, exit_code=None, timed_out=False, + started_at_monotonic=start_mono, duration_seconds=0.0, + stdout_path=stdout_path, stderr_path=stderr_path, + started_at=started_at, ended_at=ended_at, + error=f"target executable not found: {exc}", + ) + + pid = proc.pid + if on_started is not None: + try: + on_started(pid) + except Exception: # observation must never break execution + pass + + try: + exit_code = proc.wait(timeout=timeout_seconds) + except subprocess.TimeoutExpired: + timed_out = True + _terminate_tree(proc) + try: + exit_code = proc.wait(timeout=5) + except subprocess.TimeoutExpired: + error = "target did not exit after kill" + + duration = time.monotonic() - start_mono + ended_at = utc_now() + return LaunchResult( + argv=full_argv, + pid=pid, + exit_code=exit_code, + timed_out=timed_out, + started_at_monotonic=start_mono, + duration_seconds=round(duration, 4), + stdout_path=stdout_path, + stderr_path=stderr_path, + started_at=started_at, + ended_at=ended_at, + error=error, + ) + + +def _terminate_tree(proc: "subprocess.Popen") -> None: + """Best-effort SIGTERM then SIGKILL of the target's process group.""" + try: + pgid = os.getpgid(proc.pid) + except (ProcessLookupError, PermissionError): + pgid = None + for sig in (signal.SIGTERM, signal.SIGKILL): + try: + if pgid is not None: + os.killpg(pgid, sig) + else: + proc.send_signal(sig) + except (ProcessLookupError, PermissionError): + return + if sig is signal.SIGTERM: + for _ in range(20): # up to ~2s grace + if proc.poll() is not None: + return + time.sleep(0.1) diff --git a/lambda-microvm-dta/src/microvm/app/report.py b/lambda-microvm-dta/src/microvm/app/report.py new file mode 100644 index 000000000..f9c172a46 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/report.py @@ -0,0 +1,69 @@ +"""report.json v0.2.0 builder. + +Assembles the deterministic CI report from the launch result, collected events, +collector statuses, rule evaluation, and policy result. +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional +import os +import platform +import uuid + +from .events import Event +from .rules import Evaluation, PolicyResult + +SCHEMA_VERSION = "0.2.0" + + +def build_report( + *, + correlation_id: Optional[str], + commit_sha: Optional[str], + mode: str, + region: Optional[str], + microvm_id: Optional[str], + image_identifier: Optional[str], + target: Dict[str, Any], + events: List[Event], + collector_statuses: List[Dict[str, Any]], + evaluation: Evaluation, + policy_result: PolicyResult, + policy_config: Dict[str, Any], + include_evidence: bool = True, +) -> Dict[str, Any]: + collector_error_count = sum(1 for c in collector_statuses if c.get("status") == "failed") + summary = { + "status": policy_result.status, + "verdict": evaluation.verdict, + "maxSeverity": evaluation.max_severity, + "eventCount": len(events), + "matchedRuleCount": len(evaluation.matches), + "collectorErrorCount": collector_error_count, + } + report: Dict[str, Any] = { + "schemaVersion": SCHEMA_VERSION, + "project": "aws-lambda-microvm-dta-sample", + "correlationId": correlation_id or f"local-{uuid.uuid4()}", + "commitSha": commit_sha or os.environ.get("GITHUB_SHA") or "unknown", + "target": target, + "environment": { + "mode": mode, + "region": region or "", + "microvmId": microvm_id or "local", + "imageIdentifier": image_identifier or "local", + "python": platform.python_version(), + "platform": platform.platform(), + }, + "summary": summary, + "collectors": collector_statuses, + "matchedRules": [m.to_dict() for m in evaluation.matches], + "evidence": [e.to_dict() for e in events] if include_evidence else [], + "policy": { + "failOnSeverity": policy_config.get("failOnSeverity", "high"), + "failOnPolicyViolation": policy_config.get("failOnPolicyViolation", True), + "failClosed": policy_config.get("failClosed", True), + "reasons": policy_result.reasons, + }, + } + return report diff --git a/lambda-microvm-dta/src/microvm/app/rules.py b/lambda-microvm-dta/src/microvm/app/rules.py new file mode 100644 index 000000000..7671a40c1 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/rules.py @@ -0,0 +1,197 @@ +"""Simple, explainable rule engine for v0.2. + +The engine matches normalized events against declarative rules and aggregates a +verdict + max severity. It deliberately makes no high-confidence malware +classification: the verdict vocabulary is clean / suspicious / policy_violation +/ unknown / error — never ``malware``. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional +import json + +from .events import Event, SEVERITY_ORDER, max_severity, severity_gte + +VERDICTS = ("clean", "suspicious", "policy_violation", "unknown", "error") +# Ranking for aggregating the overall verdict (higher wins). +_VERDICT_RANK = {"clean": 0, "unknown": 1, "suspicious": 2, "policy_violation": 3, "error": 4} + + +class RuleError(ValueError): + pass + + +@dataclass(frozen=True) +class Rule: + id: str + name: str + severity: str + verdict: str + event_types: List[str] + argv_contains_any: List[str] = field(default_factory=list) + + def matches(self, event: Event) -> bool: + if event.type not in self.event_types: + return False + if self.argv_contains_any: + argv = (event.process or {}).get("argv") or [] + if not isinstance(argv, list): + return False + joined = [str(a) for a in argv] + if not any(tok in joined for tok in self.argv_contains_any): + return False + return True + + +@dataclass +class RuleMatch: + rule_id: str + name: str + severity: str + verdict: str + event_type: str + event_timestamp: str + message: str + + def to_dict(self) -> Dict[str, Any]: + return { + "ruleId": self.rule_id, + "name": self.name, + "severity": self.severity, + "verdict": self.verdict, + "eventType": self.event_type, + "eventTimestamp": self.event_timestamp, + "message": self.message, + } + + +@dataclass +class Evaluation: + matches: List[RuleMatch] + verdict: str + max_severity: str + + @property + def has_policy_violation(self) -> bool: + return any(m.verdict == "policy_violation" for m in self.matches) + + +def parse_rules(data: Any) -> List[Rule]: + raw_rules = data.get("rules") if isinstance(data, dict) else data + if not isinstance(raw_rules, list): + raise RuleError("rules file must contain a 'rules' list") + rules: List[Rule] = [] + for item in raw_rules: + if not isinstance(item, dict): + raise RuleError("each rule must be a mapping") + when = item.get("when") or {} + event_types: List[str] = [] + if "eventType" in when: + event_types = [str(when["eventType"])] + elif "eventTypeAny" in when: + any_list = when["eventTypeAny"] + if not isinstance(any_list, list): + raise RuleError(f"rule {item.get('id')}: eventTypeAny must be a list") + event_types = [str(x) for x in any_list] + else: + raise RuleError(f"rule {item.get('id')}: 'when' needs eventType or eventTypeAny") + + verdict = str(item.get("verdict", "unknown")) + if verdict not in VERDICTS: + raise RuleError(f"rule {item.get('id')}: invalid verdict {verdict!r}") + severity = str(item.get("severity", "low")) + if severity not in SEVERITY_ORDER: + raise RuleError(f"rule {item.get('id')}: invalid severity {severity!r}") + + argv_contains = when.get("argvContainsAny") or [] + if not isinstance(argv_contains, list): + raise RuleError(f"rule {item.get('id')}: argvContainsAny must be a list") + + rules.append( + Rule( + id=str(item.get("id", f"R{len(rules)+1:03d}")), + name=str(item.get("name", "")), + severity=severity, + verdict=verdict, + event_types=event_types, + argv_contains_any=[str(x) for x in argv_contains], + ) + ) + return rules + + +def load_rules(path: str | Path) -> List[Rule]: + p = Path(path) + text = p.read_text(encoding="utf-8") + if p.suffix.lower() == ".json": + data = json.loads(text) + else: + try: + import yaml # type: ignore + + data = yaml.safe_load(text) + except ModuleNotFoundError: + from .target_config import _parse_limited_yaml + + data = _parse_limited_yaml(text) + return parse_rules(data) + + +def evaluate_events(events: List[Event], rules: List[Rule]) -> Evaluation: + matches: List[RuleMatch] = [] + for event in events: + for rule in rules: + if rule.matches(event): + matches.append( + RuleMatch( + rule_id=rule.id, + name=rule.name, + severity=rule.severity, + verdict=rule.verdict, + event_type=event.type, + event_timestamp=event.timestamp, + message=event.message, + ) + ) + if matches: + verdict = max((m.verdict for m in matches), key=lambda v: _VERDICT_RANK.get(v, 0)) + sev = max_severity(m.severity for m in matches) + else: + verdict = "clean" + sev = "none" + return Evaluation(matches=matches, verdict=verdict, max_severity=sev) + + +@dataclass +class PolicyResult: + status: str # passed | failed | error + reasons: List[str] = field(default_factory=list) + + +def apply_policy( + evaluation: Evaluation, + *, + fail_on_severity: str, + fail_on_policy_violation: bool, + fail_closed: bool, + collector_errors: int, + target_error: bool = False, +) -> PolicyResult: + reasons: List[str] = [] + status = "passed" + + if target_error: + return PolicyResult(status="error", reasons=["target failed to execute"]) + + if fail_on_policy_violation and evaluation.has_policy_violation: + reasons.append("policy_violation rule matched") + if severity_gte(evaluation.max_severity, fail_on_severity): + reasons.append(f"max severity {evaluation.max_severity!r} >= threshold {fail_on_severity!r}") + if fail_closed and collector_errors > 0: + reasons.append(f"{collector_errors} collector(s) failed and failClosed is true") + + if reasons: + status = "failed" + return PolicyResult(status=status, reasons=reasons) diff --git a/lambda-microvm-dta/src/microvm/app/server.py b/lambda-microvm-dta/src/microvm/app/server.py new file mode 100644 index 000000000..3ea5d47ec --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/server.py @@ -0,0 +1,233 @@ +"""Sandbox supervisor HTTP service (runs inside the Lambda MicroVM). + +Exposes the analysis lifecycle and result artifacts over the authenticated +MicroVM endpoint. The supervisor receives a target config (via POST body or the +run hook payload), runs the analysis through :mod:`app.supervisor`, and serves +report.json, events.jsonl, and artifacts. +""" +from __future__ import annotations + +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any, Dict, Optional +import json +import os +import threading +import traceback + +from .supervisor import run_analysis +from .target_config import parse_target_config, load_target_config, TargetConfigError + +PORT = int(os.environ.get("PORT", "8080")) +ANALYSIS_ROOT = os.environ.get("DTA_ANALYSIS_ROOT", "/analysis") +RULES_FILE = os.environ.get("DTA_RULES_FILE", "/app/rules/default.yaml") +SOURCE_BASE = os.environ.get("DTA_SOURCE_BASE", "/app") +DEFAULT_TARGET_CONFIG = os.environ.get("DTA_TARGET_CONFIG") # optional path + +_state_lock = threading.RLock() +_state: Dict[str, Any] = { + "status": "idle", + "report": None, + "result": None, + "last_error": None, + "microvm_id": os.environ.get("AWS_LAMBDA_MICROVM_ID"), + "run_hook_payload": None, +} + + +def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: Dict[str, Any]) -> None: + body = json.dumps(payload, indent=2, sort_keys=True).encode("utf-8") + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _bytes_response(handler: BaseHTTPRequestHandler, status: int, body: bytes, content_type: str) -> None: + handler.send_response(status) + handler.send_header("Content-Type", content_type) + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _read_json(handler: BaseHTTPRequestHandler) -> Dict[str, Any]: + length = int(handler.headers.get("Content-Length") or "0") + if length == 0: + return {} + raw = handler.rfile.read(length) + return json.loads(raw.decode("utf-8")) if raw else {} + + +def _resolve_target_config(request: Dict[str, Any]): + """Determine the target config from the request, run hook payload, or env. + + Priority: explicit ``targetConfig`` object in the request → ``targetConfig`` + inside the stored run hook payload → ``DTA_TARGET_CONFIG`` file path. + """ + allow_shell = os.environ.get("DTA_ALLOW_SHELL_STRINGS", "false").lower() == "true" + if isinstance(request.get("targetConfig"), dict): + return parse_target_config(request["targetConfig"], allow_shell_strings=allow_shell) + with _state_lock: + payload = _state.get("run_hook_payload") + if isinstance(payload, str) and payload: + try: + payload = json.loads(payload) + except json.JSONDecodeError: + payload = None + if isinstance(payload, dict) and isinstance(payload.get("targetConfig"), dict): + return parse_target_config(payload["targetConfig"], allow_shell_strings=allow_shell) + if DEFAULT_TARGET_CONFIG: + return load_target_config(DEFAULT_TARGET_CONFIG, allow_shell_strings=allow_shell) + raise TargetConfigError("no target config provided (request.targetConfig, run hook payload, or DTA_TARGET_CONFIG)") + + +def _run(request: Dict[str, Any]) -> Dict[str, Any]: + config = _resolve_target_config(request) + with _state_lock: + microvm_id = _state.get("microvm_id") + result = run_analysis( + config, + root=request.get("analysisRoot") or ANALYSIS_ROOT, + rules_file=request.get("rulesFile") or RULES_FILE, + source_base=SOURCE_BASE, + correlation_id=request.get("correlationId") or os.environ.get("DTA_CORRELATION_ID"), + commit_sha=request.get("commitSha") or os.environ.get("GITHUB_SHA"), + mode="microvm", + region=os.environ.get("AWS_REGION"), + microvm_id=microvm_id, + image_identifier=request.get("imageIdentifier") or os.environ.get("DTA_IMAGE_IDENTIFIER"), + ) + with _state_lock: + _state["result"] = result + return result.report + + +class Handler(BaseHTTPRequestHandler): + server_version = "microvm-dta-supervisor/0.2.0" + + def log_message(self, fmt: str, *args: Any) -> None: + print(json.dumps({"level": "info", "message": fmt % args})) + + # --- GET --- + def do_GET(self) -> None: # noqa: N802 + if self.path == "/health": + _json_response(self, 200, {"status": "ok"}) + return + if self.path == "/analysis/status": + with _state_lock: + payload = {k: v for k, v in _state.items() if k not in ("report", "result")} + _json_response(self, 200, payload) + return + if self.path == "/analysis/report": + with _state_lock: + report = _state.get("report") + status = _state.get("status") + err = _state.get("last_error") + if report is None: + _json_response(self, 404, {"status": status, "error": err or "report not ready"}) + return + _json_response(self, 200, report) + return + if self.path == "/analysis/events": + self._serve_events() + return + if self.path.startswith("/analysis/artifacts/"): + self._serve_artifact(self.path[len("/analysis/artifacts/"):]) + return + _json_response(self, 404, {"error": "not found", "path": self.path}) + + def _serve_events(self) -> None: + with _state_lock: + result = _state.get("result") + if result is None: + _json_response(self, 404, {"error": "events not ready"}) + return + try: + body = Path(result.events_path).read_bytes() + except OSError as exc: + _json_response(self, 500, {"error": str(exc)}) + return + _bytes_response(self, 200, body, "application/x-ndjson") + + def _serve_artifact(self, name: str) -> None: + with _state_lock: + result = _state.get("result") + if result is None: + _json_response(self, 404, {"error": "artifacts not ready"}) + return + root = Path(result.report_path).parent + # Confine artifact access to the analysis root (no path traversal). + target = (root / name).resolve() + try: + target.relative_to(root.resolve()) + except ValueError: + _json_response(self, 400, {"error": "invalid artifact path"}) + return + if not target.is_file(): + _json_response(self, 404, {"error": "artifact not found", "name": name}) + return + _bytes_response(self, 200, target.read_bytes(), "application/octet-stream") + + # --- POST --- + def do_POST(self) -> None: # noqa: N802 + if self.path in {"/aws/lambda-microvms/runtime/v1/ready", "/aws/lambda-microvms/runtime/v1/validate", "/ready", "/validate"}: + _json_response(self, 200, {"status": "ready"}) + return + if self.path in {"/aws/lambda-microvms/runtime/v1/run", "/run"}: + payload = _read_json(self) + with _state_lock: + _state["microvm_id"] = payload.get("microvmId") or _state.get("microvm_id") + _state["run_hook_payload"] = payload.get("runHookPayload") + _state["status"] = "ready" + _json_response(self, 200, {"status": "run-hook-ok"}) + return + if self.path in { + "/aws/lambda-microvms/runtime/v1/suspend", "/aws/lambda-microvms/runtime/v1/resume", + "/aws/lambda-microvms/runtime/v1/terminate", "/suspend", "/resume", "/terminate", + }: + with _state_lock: + _state["status"] = self.path.rsplit("/", 1)[-1] + _json_response(self, 200, {"status": "ok", "hook": self.path}) + return + if self.path == "/analysis/start": + self._handle_start() + return + _json_response(self, 404, {"error": "not found", "path": self.path}) + + def _handle_start(self) -> None: + try: + request = _read_json(self) + except json.JSONDecodeError as exc: + _json_response(self, 400, {"status": "failed", "error": f"invalid JSON: {exc}"}) + return + with _state_lock: + if _state["status"] == "running": + _json_response(self, 409, {"status": "running", "error": "analysis already in progress"}) + return + _state["status"] = "running" + _state["last_error"] = None + try: + report = _run(request) + with _state_lock: + _state["report"] = report + _state["status"] = report["summary"]["status"] + _json_response(self, 200, {"status": report["summary"]["status"], "summary": report["summary"]}) + except Exception as exc: + error = f"{exc.__class__.__name__}: {exc}" + print(traceback.format_exc()) + with _state_lock: + _state["status"] = "failed" + _state["last_error"] = error + _json_response(self, 500, {"status": "failed", "error": error}) + + +def main() -> None: + server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) + print(json.dumps({"level": "info", "message": "supervisor listening", "port": PORT})) + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/lambda-microvm-dta/src/microvm/app/supervisor.py b/lambda-microvm-dta/src/microvm/app/supervisor.py new file mode 100644 index 000000000..e18f6cfc0 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/supervisor.py @@ -0,0 +1,214 @@ +"""Sandbox supervisor: orchestrates a single DTA analysis run. + +Flow: + 1. prepare workspace, materialize the (untrusted) target + 2. construct enabled collectors; canary presents fake assets + 3. start collectors (pre-launch snapshots / arming) + 4. launch the target as a child, optionally wrapped by strace; collectors + observe via the on_started hook + 5. stop collectors, normalize events to events.jsonl + 6. evaluate rules + policy, build report.json and run-metadata.json + +The target never reports on itself — all evidence comes from the collectors. +""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional +import json + +from .events import EventSink, utc_now +from .launcher import build_child_env, launch_target +from .report import build_report +from .rules import apply_policy, evaluate_events, load_rules +from .target_config import AnalysisConfig +from .workspace import materialize_target, prepare_workspace + +from .collectors.process import ProcessCollector +from .collectors.filesystem import FilesystemCollector +from .collectors.canary import CanaryCollector +from .collectors.network import NetworkCollector +from .collectors.strace_collector import StraceCollector + + +@dataclass +class AnalysisResult: + report: Dict[str, Any] + events_path: Path + report_path: Path + metadata_path: Path + + +def run_analysis( + config: AnalysisConfig, + *, + root: str | Path, + rules_file: str | Path, + source_base: str | Path, + correlation_id: Optional[str] = None, + commit_sha: Optional[str] = None, + mode: str = "local", + region: Optional[str] = None, + microvm_id: Optional[str] = None, + image_identifier: Optional[str] = None, +) -> AnalysisResult: + ws = prepare_workspace(root) + sink = EventSink() + + # Build collectors per config. Canary is constructed first so its env can be + # merged into the target environment. + canary: Optional[CanaryCollector] = CanaryCollector(sink, ws) if config.collectors.canary else None + collectors: List[Any] = [] + if config.collectors.process: + collectors.append(ProcessCollector(sink, ws)) + if config.collectors.filesystem: + collectors.append(FilesystemCollector(sink, ws)) + if canary is not None: + collectors.append(canary) + if config.collectors.network_procnet: + collectors.append(NetworkCollector(sink, ws)) + strace: Optional[StraceCollector] = None + if config.collectors.strace: + strace = StraceCollector(sink, ws, fail_closed=config.policy.fail_closed) + collectors.append(strace) + + target_error = False + target_meta: Dict[str, Any] = { + "type": config.target.type, + "argv": config.target.argv, + } + + try: + materialize_target(config.target, ws, source_base=Path(source_base)) + except Exception as exc: # materialization failure is a target error + target_error = True + sink.record("target.materialization_failed", severity="medium", source="supervisor", + message=f"Could not materialize target: {exc}") + + # Pre-launch setup for all collectors. + for c in collectors: + try: + c.setup() + except Exception as exc: # a collector setup failure must not crash the run + c.fail(f"setup error: {exc}") + + launch_result = None + if not target_error: + canary_env = canary.canary_env if canary is not None else {} + env = build_child_env(config.target.environment, canary_env) + cwd = ws.workspace + if config.target.working_directory: + wd = Path(config.target.working_directory) + cwd = wd if wd.is_absolute() else (ws.root / config.target.working_directory) + cwd.mkdir(parents=True, exist_ok=True) + + wrapper = strace.strace_wrapper if (strace is not None and strace.available) else None + + def on_started(pid: int) -> None: + for c in collectors: + try: + c.on_target_started(pid) + except Exception as exc: + c.fail(f"on_target_started error: {exc}") + + launch_result = launch_target( + config.target.argv, + cwd=cwd, + env=env, + timeout_seconds=config.target.timeout_seconds, + stdout_path=ws.artifacts / "stdout.log", + stderr_path=ws.artifacts / "stderr.log", + wrapper=wrapper, + on_started=on_started, + ) + target_meta.update({ + "startedAt": launch_result.started_at, + "endedAt": launch_result.ended_at, + "exitCode": launch_result.exit_code, + "timedOut": launch_result.timed_out, + "durationSeconds": launch_result.duration_seconds, + }) + if launch_result.error: + target_error = True + sink.record("target.launch_failed", severity="medium", source="supervisor", + message=launch_result.error) + + # Post-execution finish. strace must finish first so its syscall events are + # in the sink before the canary collector cross-references them for + # high-confidence read detection. + finish_order = sorted( + collectors, + key=lambda c: 0 if isinstance(c, StraceCollector) else (2 if isinstance(c, CanaryCollector) else 1), + ) + for c in finish_order: + try: + c.finish(launch_result) + except Exception as exc: + c.fail(f"finish error: {exc}") + + events = sink.sorted_events() + events_path = sink.write_jsonl(ws.root / "events.jsonl") + + rules = load_rules(rules_file) + evaluation = evaluate_events(events, rules) + collector_statuses = [c.status.to_dict() for c in collectors] + collector_errors = sum(1 for s in collector_statuses if s["status"] == "failed") + + policy_result = apply_policy( + evaluation, + fail_on_severity=config.policy.fail_on_severity, + fail_on_policy_violation=config.policy.fail_on_policy_violation, + fail_closed=config.policy.fail_closed, + collector_errors=collector_errors, + target_error=target_error, + ) + + report = build_report( + correlation_id=correlation_id, + commit_sha=commit_sha, + mode=mode, + region=region, + microvm_id=microvm_id, + image_identifier=image_identifier, + target=target_meta, + events=events, + collector_statuses=collector_statuses, + evaluation=evaluation, + policy_result=policy_result, + policy_config={ + "failOnSeverity": config.policy.fail_on_severity, + "failOnPolicyViolation": config.policy.fail_on_policy_violation, + "failClosed": config.policy.fail_closed, + }, + ) + + report_path = ws.root / "report.json" + report_path.write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8") + + metadata = { + "correlationId": report["correlationId"], + "commitSha": report["commitSha"], + "mode": mode, + "region": region or "", + "microvmId": microvm_id or "local", + "imageIdentifier": image_identifier or "local", + "generatedAt": utc_now(), + "artifacts": { + "report": "report.json", + "events": "events.jsonl", + "stdout": "artifacts/stdout.log", + "stderr": "artifacts/stderr.log", + "straceDir": "artifacts/strace", + }, + "cloudwatch": {}, + } + metadata_path = ws.root / "run-metadata.json" + metadata_path.write_text(json.dumps(metadata, indent=2, sort_keys=True), encoding="utf-8") + + return AnalysisResult( + report=report, + events_path=events_path, + report_path=report_path, + metadata_path=metadata_path, + ) diff --git a/lambda-microvm-dta/src/microvm/app/target_config.py b/lambda-microvm-dta/src/microvm/app/target_config.py new file mode 100644 index 000000000..f1f58489d --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/target_config.py @@ -0,0 +1,241 @@ +"""Strict target configuration parsing and validation for the v0.2 supervisor. + +The supervisor executes *untrusted* targets, so the config format is +deliberately narrow: argv arrays only, never shell strings. This module loads, +validates, and normalizes a target config (YAML or JSON) into typed objects the +supervisor can act on. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional +import json +import re + +# Constructs that signal a copy-pasted shell one-liner rather than a clean argv +# element. argv is executed WITHOUT a shell, so these are only ever literal +# arguments — but rejecting them in public-sample mode keeps intent unambiguous +# and steers users toward argv arrays / script targets. We intentionally allow +# lone ';', '(', ')' and '$' because they appear in normal inline code (e.g. +# `python3 -c "a;b"`); we reject pipes, redirects, command substitution, +# backticks, chaining operators, and embedded newlines. +_SHELL_METACHARACTERS = re.compile(r"[|`><\n]|\$\(|&&|\|\||(?:^|\s)&(?:\s|$)") + +SUPPORTED_TARGET_TYPES = {"command", "script", "archive"} + + +class TargetConfigError(ValueError): + pass + + +@dataclass(frozen=True) +class TargetSpec: + type: str + argv: List[str] + timeout_seconds: float + working_directory: Optional[str] + environment: Dict[str, str] + # script: path to a script file to copy into the workspace and execute. + source: Optional[str] = None + # archive: path to a zip/tar to extract; argv is the entrypoint after extraction. + archive: Optional[str] = None + + +@dataclass(frozen=True) +class CollectorsSpec: + process: bool = True + filesystem: bool = True + canary: bool = True + strace: bool = False + network_procnet: bool = True + network_vpc_flow_logs: bool = False + + +@dataclass(frozen=True) +class PolicySpec: + fail_on_severity: str = "high" + fail_on_policy_violation: bool = True + fail_closed: bool = True + + +@dataclass(frozen=True) +class AnalysisConfig: + target: TargetSpec + collectors: CollectorsSpec + policy: PolicySpec + raw: Dict[str, Any] = field(default_factory=dict) + + +def _load_mapping(path: Path) -> Dict[str, Any]: + text = path.read_text(encoding="utf-8") + if path.suffix.lower() == ".json": + data = json.loads(text) + else: + try: + import yaml # type: ignore + + data = yaml.safe_load(text) + except ModuleNotFoundError: + data = _parse_limited_yaml(text) + if not isinstance(data, dict): + raise TargetConfigError("Target config must load to a mapping/object") + return data + + +def _parse_limited_yaml(text: str) -> Dict[str, Any]: + """Minimal YAML fallback for the target-config shape used by examples. + + Supports nested mappings (by indentation), inline ``["a", "b"]`` flow lists, + scalars, and booleans/integers. Not a general YAML parser; PyYAML is used + whenever it is installed (it is in the MicroVM image). + """ + root: Dict[str, Any] = {} + # stack of (indent, container) + stack: List[tuple[int, Dict[str, Any]]] = [(-1, root)] + for raw in text.splitlines(): + line = raw.split("#", 1)[0].rstrip() + if not line.strip(): + continue + indent = len(line) - len(line.lstrip(" ")) + key, _, rest = line.strip().partition(":") + key = key.strip() + rest = rest.strip() + while stack and indent <= stack[-1][0]: + stack.pop() + parent = stack[-1][1] + if rest == "": + child: Dict[str, Any] = {} + parent[key] = child + stack.append((indent, child)) + else: + parent[key] = _coerce_scalar(rest) + return root + + +def _coerce_scalar(value: str) -> Any: + if value == "{}": + return {} + if value.startswith("[") and value.endswith("]"): + inner = value[1:-1].strip() + if not inner: + return [] + return [_coerce_scalar(part.strip()) for part in _split_flow(inner)] + if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): + return value[1:-1] + low = value.lower() + if low in {"true", "false"}: + return low == "true" + if low in {"null", "~", "none"}: + return None + if re.fullmatch(r"-?\d+", value): + return int(value) + if re.fullmatch(r"-?\d+\.\d+", value): + return float(value) + return value + + +def _split_flow(inner: str) -> List[str]: + parts: List[str] = [] + buf = "" + quote = "" + for ch in inner: + if quote: + buf += ch + if ch == quote: + quote = "" + elif ch in "\"'": + quote = ch + buf += ch + elif ch == ",": + parts.append(buf) + buf = "" + else: + buf += ch + if buf.strip(): + parts.append(buf) + return parts + + +def _validate_argv(argv: Any, *, allow_shell_strings: bool) -> List[str]: + if not isinstance(argv, list) or not argv or not all(isinstance(a, str) for a in argv): + raise TargetConfigError("target.argv must be a non-empty list of strings") + if not allow_shell_strings: + for arg in argv: + if _SHELL_METACHARACTERS.search(arg): + raise TargetConfigError( + f"target.argv element contains shell metacharacters and is rejected in public-sample mode: {arg!r}" + ) + return list(argv) + + +def load_target_config(path: str | Path, *, allow_shell_strings: bool = False) -> AnalysisConfig: + return parse_target_config(_load_mapping(Path(path)), allow_shell_strings=allow_shell_strings) + + +def parse_target_config(data: Dict[str, Any], *, allow_shell_strings: bool = False) -> AnalysisConfig: + target_raw = data.get("target") + if not isinstance(target_raw, dict): + raise TargetConfigError("Config must contain a 'target' object") + + ttype = str(target_raw.get("type", "")) + if ttype not in SUPPORTED_TARGET_TYPES: + raise TargetConfigError(f"Unsupported target.type: {ttype!r} (allowed: {sorted(SUPPORTED_TARGET_TYPES)})") + + argv = _validate_argv(target_raw.get("argv"), allow_shell_strings=allow_shell_strings) + + timeout = target_raw.get("timeoutSeconds", 60) + try: + timeout = float(timeout) + except (TypeError, ValueError): + raise TargetConfigError("target.timeoutSeconds must be a number") + if timeout <= 0: + raise TargetConfigError("target.timeoutSeconds must be > 0") + + env_raw = target_raw.get("environment") or {} + if not isinstance(env_raw, dict): + raise TargetConfigError("target.environment must be a mapping") + environment = {str(k): str(v) for k, v in env_raw.items()} + + source = target_raw.get("source") + archive = target_raw.get("archive") + if ttype == "script" and not source: + raise TargetConfigError("target.type 'script' requires 'source'") + if ttype == "archive" and not archive: + raise TargetConfigError("target.type 'archive' requires 'archive'") + + target = TargetSpec( + type=ttype, + argv=argv, + timeout_seconds=timeout, + working_directory=target_raw.get("workingDirectory"), + environment=environment, + source=str(source) if source else None, + archive=str(archive) if archive else None, + ) + + collectors = _parse_collectors(data.get("collectors") or {}) + policy = _parse_policy(data.get("policy") or {}) + return AnalysisConfig(target=target, collectors=collectors, policy=policy, raw=data) + + +def _parse_collectors(raw: Dict[str, Any]) -> CollectorsSpec: + network = raw.get("network") or {} + if not isinstance(network, dict): + network = {} + return CollectorsSpec( + process=bool(raw.get("process", True)), + filesystem=bool(raw.get("filesystem", True)), + canary=bool(raw.get("canary", True)), + strace=bool(raw.get("strace", False)), + network_procnet=bool(network.get("procNet", True)), + network_vpc_flow_logs=bool(network.get("vpcFlowLogs", False)), + ) + + +def _parse_policy(raw: Dict[str, Any]) -> PolicySpec: + return PolicySpec( + fail_on_severity=str(raw.get("failOnSeverity", "high")), + fail_on_policy_violation=bool(raw.get("failOnPolicyViolation", True)), + fail_closed=bool(raw.get("failClosed", True)), + ) diff --git a/lambda-microvm-dta/src/microvm/app/workspace.py b/lambda-microvm-dta/src/microvm/app/workspace.py new file mode 100644 index 000000000..0e2e73b17 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/app/workspace.py @@ -0,0 +1,112 @@ +"""Analysis workspace management for the v0.2 supervisor. + +Each analysis runs in a dedicated workspace tree so collectors have a bounded +area to observe and cleanup is simple: + + / + workspace/ # target cwd; materialized script/archive lands here + canary/ # fake canary files + tmp/ # temp-like dir watched by the filesystem collector + artifacts/ # strace logs, stdout/stderr, etc. +""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +import shutil +import tarfile +import zipfile + +from .target_config import TargetSpec, TargetConfigError + + +@dataclass(frozen=True) +class Workspace: + root: Path + workspace: Path + canary: Path + tmp: Path + artifacts: Path + + @property + def monitored_dirs(self) -> list[Path]: + return [self.workspace, self.canary, self.tmp] + + +def prepare_workspace(root: str | Path) -> Workspace: + root_path = Path(root).resolve() + ws = Workspace( + root=root_path, + workspace=root_path / "workspace", + canary=root_path / "canary", + tmp=root_path / "tmp", + artifacts=root_path / "artifacts", + ) + for d in (ws.workspace, ws.canary, ws.tmp, ws.artifacts): + d.mkdir(parents=True, exist_ok=True) + return ws + + +def _is_within(child: Path, parent: Path) -> bool: + try: + child.resolve().relative_to(parent.resolve()) + return True + except ValueError: + return False + + +def materialize_target(target: TargetSpec, ws: Workspace, *, source_base: Path) -> None: + """Place script/archive targets into the workspace before execution. + + ``command`` targets need nothing materialized — argv runs as-is in the + workspace cwd. ``source_base`` is the directory that relative source/archive + paths in the config are resolved against (the supervisor's app dir locally, + or the materialized config dir in MicroVM mode). + """ + if target.type == "command": + return + if target.type == "script": + src = (source_base / target.source).resolve() if not Path(target.source).is_absolute() else Path(target.source) + if not src.is_file(): + raise TargetConfigError(f"script source not found: {src}") + dest = ws.workspace / src.name + shutil.copy2(src, dest) + dest.chmod(dest.stat().st_mode | 0o100) # u+x + return + if target.type == "archive": + arc = (source_base / target.archive).resolve() if not Path(target.archive).is_absolute() else Path(target.archive) + if not arc.is_file(): + raise TargetConfigError(f"archive not found: {arc}") + _safe_extract(arc, ws.workspace) + return + raise TargetConfigError(f"Unsupported target type for materialization: {target.type}") + + +def _safe_extract(archive: Path, dest: Path) -> None: + """Extract zip/tar, refusing path-traversal entries (Zip-Slip).""" + dest = dest.resolve() + if zipfile.is_zipfile(archive): + with zipfile.ZipFile(archive) as zf: + for name in zf.namelist(): + target_path = (dest / name).resolve() + if not _is_within(target_path, dest): + raise TargetConfigError(f"archive entry escapes workspace: {name}") + zf.extractall(dest) + return + if tarfile.is_tarfile(archive): + with tarfile.open(archive) as tf: + for member in tf.getmembers(): + target_path = (dest / member.name).resolve() + if not _is_within(target_path, dest): + raise TargetConfigError(f"archive entry escapes workspace: {member.name}") + tf.extractall(dest) + return + raise TargetConfigError(f"archive is neither zip nor tar: {archive}") + + +def cleanup_workspace(ws: Workspace, *, keep_artifacts: bool = True) -> None: + """Remove the workspace, optionally preserving collected artifacts.""" + for d in (ws.workspace, ws.canary, ws.tmp): + shutil.rmtree(d, ignore_errors=True) + if not keep_artifacts: + shutil.rmtree(ws.artifacts, ignore_errors=True) diff --git a/lambda-microvm-dta/src/microvm/requirements.txt b/lambda-microvm-dta/src/microvm/requirements.txt new file mode 100644 index 000000000..8392d5414 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/requirements.txt @@ -0,0 +1 @@ +PyYAML==6.0.2 diff --git a/lambda-microvm-dta/src/microvm/rules/default.yaml b/lambda-microvm-dta/src/microvm/rules/default.yaml new file mode 100644 index 000000000..98b07f4c0 --- /dev/null +++ b/lambda-microvm-dta/src/microvm/rules/default.yaml @@ -0,0 +1,63 @@ +# Default v0.2 DTA rule pack. Conservative and explainable — no malware verdicts. +# Verdicts: clean | suspicious | policy_violation | unknown | error +rules: + - id: R001 + name: executable-created-in-workspace + when: + eventType: file.executable_created + severity: medium + verdict: suspicious + + # Only high-confidence canary reads (strace-confirmed) are a policy violation. + # canary.file_touched_suspected (low-confidence atime) is intentionally NOT + # here; it appears in evidence but does not fail the build on its own. + - id: R002 + name: canary-file-read + when: + eventTypeAny: + - canary.file_read_observed + - canary.aws_credential_path_touched + severity: high + verdict: policy_violation + + - id: R003 + name: unexpected-network-egress + when: + eventType: network.egress_policy_violation + severity: high + verdict: policy_violation + + - id: R004 + name: shell-executed-by-target + when: + eventType: syscall.execve + argvContainsAny: + - "/bin/sh" + - "/bin/bash" + - "sh" + - "bash" + severity: medium + verdict: suspicious + + - id: R005 + name: privilege-change-attempt + when: + eventTypeAny: + - syscall.privilege_change_attempt + - syscall.capability_change_attempt + severity: high + verdict: policy_violation + + - id: R006 + name: process-timeout + when: + eventType: process.timeout + severity: medium + verdict: suspicious + + - id: R007 + name: nonzero-exit + when: + eventType: process.nonzero_exit + severity: low + verdict: unknown diff --git a/lambda-microvm-dta/src/orchestrator/pyproject.toml b/lambda-microvm-dta/src/orchestrator/pyproject.toml new file mode 100644 index 000000000..2ee5b676c --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "microvm-dta-orchestrator" +version = "0.1.0" +description = "CI orchestrator for aws-lambda-microvm-dta-sample" +requires-python = ">=3.11" +dependencies = [] + +[project.scripts] +microvm-dta = "microvm_dta.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/__init__.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/__init__.py new file mode 100644 index 000000000..3dc1f76bc --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.0" diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/aws_cli.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/aws_cli.py new file mode 100644 index 000000000..c9ee10f16 --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/aws_cli.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional +import json +import subprocess +import time + + +class AwsCliError(RuntimeError): + pass + + +@dataclass +class AwsCli: + profile: Optional[str] = None + region: Optional[str] = None + dry_print: bool = False + + def _base(self) -> List[str]: + cmd = ["aws"] + if self.profile: + cmd.extend(["--profile", self.profile]) + if self.region: + cmd.extend(["--region", self.region]) + return cmd + + def run(self, args: List[str], *, expect_json: bool = True) -> Dict[str, Any] | str: + cmd = self._base() + args + if expect_json and "--output" not in args: + cmd.extend(["--output", "json"]) + if self.dry_print: + print(" ".join(cmd)) + return {} + proc = subprocess.run(cmd, capture_output=True, text=True, check=False) + if proc.returncode != 0: + raise AwsCliError( + "AWS CLI command failed\n" + f"command: {' '.join(cmd)}\n" + f"returncode: {proc.returncode}\n" + f"stdout: {proc.stdout}\n" + f"stderr: {proc.stderr}" + ) + if not expect_json: + return proc.stdout + stdout = proc.stdout.strip() + if not stdout: + return {} + try: + return json.loads(stdout) + except json.JSONDecodeError as exc: + raise AwsCliError(f"Expected JSON from AWS CLI but could not parse stdout: {stdout}") from exc + + def microvms(self, args: List[str], *, expect_json: bool = True) -> Dict[str, Any] | str: + return self.run(["lambda-microvms", *args], expect_json=expect_json) + + +def sleep_with_backoff(attempt: int, base: float = 1.0, cap: float = 15.0) -> None: + delay = min(cap, base * (2 ** max(0, attempt - 1))) + time.sleep(delay) diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/cli.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/cli.py new file mode 100644 index 000000000..bae64d291 --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/cli.py @@ -0,0 +1,492 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List, Optional +import argparse +import json +import os +import sys +import time + +from .aws_cli import AwsCli, AwsCliError +from .evaluate import evaluate_report +from .http_client import request_json +from .local import run_local_analysis +from .packaging import package_microvm + +DEFAULT_INGRESS = "arn:aws:lambda:{region}:aws:network-connector:aws-network-connector:ALL_INGRESS" +DEFAULT_INTERNET_EGRESS = "arn:aws:lambda:{region}:aws:network-connector:aws-network-connector:INTERNET_EGRESS" + + +def _print_json(data: Dict[str, Any]) -> None: + print(json.dumps(data, indent=2, sort_keys=True)) + + +# --------------------------------------------------------------------------- # +# Safety guards +# --------------------------------------------------------------------------- # +def _require_aws_mode(args: argparse.Namespace) -> None: + """AWS-mode commands must be explicitly opted into. + + Requires DTA_ALLOW_AWS_MODE=true in the environment and --confirm-sandbox-account + on the command line so a real, billable MicroVM run is never a surprise. + """ + if getattr(args, "print_only", False): + return # print-only never touches AWS + if os.environ.get("DTA_ALLOW_AWS_MODE", "").lower() != "true": + raise SystemExit("AWS mode is disabled. Set DTA_ALLOW_AWS_MODE=true to enable real MicroVM operations.") + if not getattr(args, "confirm_sandbox_account", False): + raise SystemExit("Refusing to run AWS mode without --confirm-sandbox-account (use a dedicated sandbox account).") + + +# --------------------------------------------------------------------------- # +# Local commands +# --------------------------------------------------------------------------- # +def cmd_package(args: argparse.Namespace) -> int: + out = package_microvm(args.source_dir, args.output) + print(str(out)) + return 0 + + +def cmd_dry_run(args: argparse.Namespace) -> int: + report = run_local_analysis( + target_config=args.target_config, + root=args.workspace, + rules_file=args.rules_file, + correlation_id=args.correlation_id, + commit_sha=args.commit_sha, + allow_shell_strings=args.allow_shell_strings, + ) + _print_json(report["summary"]) + return 0 if report["summary"]["status"] == "passed" else 1 + + +def cmd_evaluate(args: argparse.Namespace) -> int: + report = json.loads(Path(args.report).read_text(encoding="utf-8")) + ok, reasons = evaluate_report( + report, + fail_on_severity=args.fail_on_severity, + fail_on_policy_violation=args.fail_on_policy_violation, + ) + if ok: + print("policy: passed") + return 0 + print("policy: failed") + for reason in reasons: + print(f"- {reason}") + return 1 + + +# --------------------------------------------------------------------------- # +# AWS helpers +# --------------------------------------------------------------------------- # +def _aws(args: argparse.Namespace) -> AwsCli: + return AwsCli(profile=args.profile, region=args.region, dry_print=getattr(args, "print_only", False)) + + +def _looks_like_not_found(error: AwsCliError) -> bool: + text = str(error).lower() + # "Invalid ARN format" appears when an image name is passed as an identifier + # to a not-yet-existing image; treat it as "absent → create" too. + markers = ("resourcenotfound", "notfound", "does not exist", "not found", "invalid arn format") + return any(marker in text for marker in markers) + + +def _region_of(args: argparse.Namespace) -> str: + return args.region or os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" + + +def _account_id(aws: AwsCli) -> Optional[str]: + try: + resp = aws.run(["sts", "get-caller-identity"]) + if isinstance(resp, dict): + return str(resp.get("Account")) or None + except AwsCliError: + return None + return None + + +def _image_arn(aws: AwsCli, args: argparse.Namespace) -> Optional[str]: + """Build a MicroVM image ARN from name + region + account for get/update. + + The control-plane get/update operations require a full ARN as the + --image-identifier; create takes a bare --name. + """ + if args.image_name.startswith("arn:"): + return args.image_name + account = _account_id(aws) + if not account: + return None + return f"arn:aws:lambda:{_region_of(args)}:{account}:microvm-image:{args.image_name}" + + +def cmd_build_image(args: argparse.Namespace) -> int: + aws = _aws(args) + image_arn = _image_arn(aws, args) + image_exists = False + if image_arn: + try: + aws.microvms(["get-microvm-image", "--image-identifier", image_arn]) + image_exists = True + except AwsCliError as exc: + if not _looks_like_not_found(exc): + raise + image_exists = False + common = [ + "--code-artifact", f"uri={args.artifact_uri}", + "--base-image-arn", args.base_image_arn, + "--build-role-arn", args.build_role_arn, + ] + if args.additional_os_capabilities: + common.extend(["--additional-os-capabilities", json.dumps(args.additional_os_capabilities.split(","))]) + if args.description: + common.extend(["--description", args.description]) + if image_exists: + resp = aws.microvms(["update-microvm-image", "--image-identifier", image_arn, *common]) + else: + resp = aws.microvms(["create-microvm-image", "--name", args.image_name, *common]) + _print_json(resp if isinstance(resp, dict) else {"output": resp}) + return 0 + + +def cmd_wait_image(args: argparse.Namespace) -> int: + aws = _aws(args) + deadline = time.monotonic() + args.timeout_seconds + last: Dict[str, Any] = {} + while time.monotonic() < deadline: + resp = aws.microvms(["get-microvm-image", "--image-identifier", args.image_identifier]) + if not isinstance(resp, dict): + raise RuntimeError("Unexpected non-JSON response") + last = resp + state = str(resp.get("state")) + # The GA control plane reports image readiness via `state` + # (CREATING -> CREATED, UPDATING -> UPDATED). A versionState field is + # optional; when present it must also be terminal-success, but its + # absence on a CREATED/UPDATED/ACTIVE image means ready. + raw_vs = resp.get("versionState") + if raw_vs is None: + raw_vs = resp.get("latestVersionState") + version_state = "" if raw_vs is None else str(raw_vs) + print(f"image state={state} versionState={version_state or 'n/a'}", file=sys.stderr) + terminal_success = {"CREATED", "UPDATED", "ACTIVE"} + if state in terminal_success and version_state in {"", "SUCCESSFUL", "ACTIVE"}: + _print_json(resp) + return 0 + if state in {"CREATION_FAILED", "UPDATE_FAILED", "DELETED", "DELETION_FAILED"} or version_state == "FAILED": + _print_json(resp) + return 1 + time.sleep(args.poll_seconds) + print("Timed out waiting for image readiness", file=sys.stderr) + _print_json(last) + return 1 + + +def _wait_microvm_running(aws: AwsCli, microvm_id: str, timeout_seconds: int, poll_seconds: int) -> Dict[str, Any]: + deadline = time.monotonic() + timeout_seconds + last: Dict[str, Any] = {} + while time.monotonic() < deadline: + resp = aws.microvms(["get-microvm", "--microvm-identifier", microvm_id]) + if not isinstance(resp, dict): + raise RuntimeError("Unexpected non-JSON response") + last = resp + state = str(resp.get("state")) + print(f"microvm state={state}", file=sys.stderr) + if state == "RUNNING": + return resp + if state in {"FAILED", "TERMINATED"}: + raise RuntimeError(f"MicroVM entered terminal state {state}: {resp}") + time.sleep(poll_seconds) + raise TimeoutError(f"Timed out waiting for MicroVM to RUNNING. Last response: {last}") + + +def _extract_token(auth_resp: Dict[str, Any]) -> str: + token_obj = auth_resp.get("authToken") + if isinstance(token_obj, dict) and "X-aws-proxy-auth" in token_obj: + return str(token_obj["X-aws-proxy-auth"]) + if isinstance(token_obj, str): + return token_obj + raise RuntimeError( + "Cannot extract auth token from create-microvm-auth-token response " + f"(top-level keys: {sorted(auth_resp.keys())})" + ) + + +def _build_run_hook_payload(args: argparse.Namespace) -> Optional[str]: + payload: Dict[str, Any] = {} + if args.correlation_id: + payload["correlationId"] = args.correlation_id + if args.commit_sha: + payload["commitSha"] = args.commit_sha + if getattr(args, "target_config", None): + payload["targetConfig"] = json.loads(_target_config_as_json(args.target_config)) + return json.dumps(payload) if payload else None + + +def _target_config_as_json(path: str) -> str: + """Load a target config (YAML or JSON) into a JSON string for the run hook.""" + text = Path(path).read_text(encoding="utf-8") + if path.lower().endswith(".json"): + return text + import yaml # type: ignore + + return json.dumps(yaml.safe_load(text)) + + +def cmd_run(args: argparse.Namespace) -> int: + """Start a MicroVM and leave it running (lifecycle only). + + Prints microvmId/endpoint for a subsequent start-analysis/fetch-results. + """ + _require_aws_mode(args) + aws = _aws(args) + region = _region_of(args) + ingress = args.ingress_network_connector or DEFAULT_INGRESS.format(region=region) + egress = args.egress_network_connector or DEFAULT_INTERNET_EGRESS.format(region=region) + if args.profile_mode == "production" and not args.egress_network_connector: + raise SystemExit("production profile requires an explicit --egress-network-connector (VPC egress)") + run_args = [ + "run-microvm", + "--image-identifier", args.image_identifier, + "--ingress-network-connectors", ingress, + "--egress-network-connectors", egress, + "--maximum-duration-in-seconds", str(args.max_duration_seconds), + "--idle-policy", json.dumps({ + "autoResumeEnabled": True, + "maxIdleDurationSeconds": args.idle_seconds, + "suspendedDurationSeconds": args.suspended_seconds, + }), + ] + if args.image_version: + run_args.extend(["--image-version", args.image_version]) + if args.execution_role_arn: + run_args.extend(["--execution-role-arn", args.execution_role_arn]) + payload = _build_run_hook_payload(args) + if payload: + run_args.extend(["--run-hook-payload", payload]) + resp = aws.microvms(run_args) + if aws.dry_print: + aws.microvms(["get-microvm", "--microvm-identifier", ""]) + print("print-only: skipped live run") + return 0 + if not isinstance(resp, dict): + raise RuntimeError("Unexpected non-JSON response from run-microvm") + microvm_id = str(resp["microvmId"]) + endpoint = str(resp["endpoint"]) + _wait_microvm_running(aws, microvm_id, args.wait_timeout_seconds, args.poll_seconds) + _print_json({"microvmId": microvm_id, "endpoint": endpoint}) + return 0 + + +def _endpoint_session(aws: AwsCli, microvm_id: str, port: int, token_minutes: int): + info = aws.microvms(["get-microvm", "--microvm-identifier", microvm_id]) + endpoint = str(info["endpoint"]) + auth = aws.microvms([ + "create-microvm-auth-token", + "--microvm-identifier", microvm_id, + "--expiration-in-minutes", str(token_minutes), + "--allowed-ports", json.dumps([{"port": port}]), + ]) + token = _extract_token(auth) + return f"https://{endpoint}", token + + +def cmd_start_analysis(args: argparse.Namespace) -> int: + _require_aws_mode(args) + aws = _aws(args) + base_url, token = _endpoint_session(aws, args.microvm_identifier, args.port, args.token_expiration_minutes) + body: Dict[str, Any] = {"correlationId": args.correlation_id, "commitSha": args.commit_sha} + if getattr(args, "target_config", None): + body["targetConfig"] = json.loads(_target_config_as_json(args.target_config)) + resp = request_json("POST", f"{base_url}/analysis/start", token=token, port=args.port, payload=body, retries=3) + _print_json(resp) + return 0 + + +def cmd_fetch_results(args: argparse.Namespace) -> int: + _require_aws_mode(args) + aws = _aws(args) + base_url, token = _endpoint_session(aws, args.microvm_identifier, args.port, args.token_expiration_minutes) + out_dir = Path(args.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + report = request_json("GET", f"{base_url}/analysis/report", token=token, port=args.port, retries=5) + + # events.jsonl and named artifacts are best-effort. + for name, path in (("events.jsonl", "/analysis/events"),): + try: + raw = request_json("GET", f"{base_url}{path}", token=token, port=args.port, retries=3, raw=True) + (out_dir / name).write_bytes(raw) + except Exception as exc: # noqa: BLE001 + print(f"WARNING: could not fetch {name}: {exc}", file=sys.stderr) + + # Optional VPC Flow Logs correlation (post-processing metadata, never fatal). + if getattr(args, "flow_log_group", None): + from .flow_logs import fetch_flow_logs, attach_flow_logs + target = report.get("target", {}) + result = fetch_flow_logs( + aws, + log_group=args.flow_log_group, + start_iso=str(target.get("startedAt", "")), + end_iso=str(target.get("endedAt", "")), + eni_id=getattr(args, "flow_log_eni", None), + ) + attach_flow_logs(report, result) + print(f"flow logs: available={result.available} records={result.recordCount}") + + (out_dir / "report.json").write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8") + + ok, reasons = evaluate_report(report, fail_on_severity=args.fail_on_severity, fail_on_policy_violation=args.fail_on_policy_violation) + print(f"report written to {out_dir/'report.json'}") + if ok: + print("policy: passed") + return 0 + print("policy: failed") + for reason in reasons: + print(f"- {reason}") + return 1 + + +def cmd_terminate(args: argparse.Namespace) -> int: + _require_aws_mode(args) + aws = _aws(args) + resp = aws.microvms(["terminate-microvm", "--microvm-identifier", args.microvm_identifier]) + _print_json(resp if isinstance(resp, dict) else {"output": resp}) + return 0 + + +def cmd_cleanup_stale(args: argparse.Namespace) -> int: + """Terminate MicroVMs left running (safety net for failed CI runs).""" + _require_aws_mode(args) + aws = _aws(args) + resp = aws.microvms(["list-microvms"]) + items = resp.get("items", resp.get("microvms", [])) if isinstance(resp, dict) else [] + terminated: List[str] = [] + for item in items: + state = str(item.get("state", "")).upper() + mid = item.get("microvmId") or item.get("microVmId") + if mid and state in {"RUNNING", "SUSPENDED", "PENDING"}: + try: + aws.microvms(["terminate-microvm", "--microvm-identifier", str(mid)]) + terminated.append(str(mid)) + except AwsCliError as exc: + print(f"WARNING: failed to terminate {mid}: {exc}", file=sys.stderr) + _print_json({"terminated": terminated, "count": len(terminated)}) + return 0 + + +# --------------------------------------------------------------------------- # +# Argument parser +# --------------------------------------------------------------------------- # +def _add_aws_safety_flags(p: argparse.ArgumentParser) -> None: + p.add_argument("--confirm-sandbox-account", action="store_true", help="Required to run real AWS operations") + p.add_argument("--print-only", action="store_true", help="Print the planned AWS CLI calls without executing") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(prog="microvm-dta") + parser.add_argument("--profile", default=None, help="AWS profile for AWS commands") + parser.add_argument("--region", default=os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION"), help="AWS region") + sub = parser.add_subparsers(dest="command", required=True) + + p = sub.add_parser("package", help="Package microvm/ into a zip artifact") + p.add_argument("--source-dir", default="microvm") + p.add_argument("--output", default="out/microvm-dta.zip") + p.set_defaults(func=cmd_package) + + p = sub.add_parser("dry-run", help="Run a target through the supervisor locally (no AWS)") + p.add_argument("--target-config", default="examples/targets/benign-command.yaml") + p.add_argument("--workspace", default="out/analysis") + p.add_argument("--rules-file", default=None) + p.add_argument("--correlation-id", default=os.environ.get("GITHUB_RUN_ID")) + p.add_argument("--commit-sha", default=os.environ.get("GITHUB_SHA")) + p.add_argument("--allow-shell-strings", action="store_true", help="(unsafe) allow shell metacharacters in argv") + p.set_defaults(func=cmd_dry_run) + + p = sub.add_parser("evaluate", help="Evaluate a report against CI policy") + p.add_argument("--report", required=True) + p.add_argument("--fail-on-severity", default="high") + p.add_argument("--fail-on-policy-violation", action=argparse.BooleanOptionalAction, default=True) + p.set_defaults(func=cmd_evaluate) + + p = sub.add_parser("build-image", help="Create or update a Lambda MicroVM image") + p.add_argument("--image-name", required=True) + p.add_argument("--artifact-uri", required=True) + p.add_argument("--base-image-arn", required=True) + p.add_argument("--build-role-arn", required=True) + p.add_argument("--additional-os-capabilities", default=None, help="Comma-separated, e.g. ALL") + p.add_argument("--description", default="aws-lambda-microvm-dta-sample") + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_build_image) + + p = sub.add_parser("wait-image", help="Wait for MicroVM image readiness") + p.add_argument("--image-identifier", required=True) + p.add_argument("--timeout-seconds", type=int, default=1800) + p.add_argument("--poll-seconds", type=int, default=15) + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_wait_image) + + p = sub.add_parser("run", help="Start a MicroVM (lifecycle only)") + p.add_argument("--image-identifier", required=True) + p.add_argument("--image-version", default=None) + p.add_argument("--execution-role-arn", default=None) + p.add_argument("--target-config", default=None, help="Embed target config in the run hook payload") + p.add_argument("--profile-mode", choices=["public", "production"], default="public") + p.add_argument("--max-duration-seconds", type=int, default=900) + p.add_argument("--idle-seconds", type=int, default=600) + p.add_argument("--suspended-seconds", type=int, default=300) + p.add_argument("--wait-timeout-seconds", type=int, default=300) + p.add_argument("--poll-seconds", type=int, default=5) + p.add_argument("--ingress-network-connector", default=None) + p.add_argument("--egress-network-connector", default=None) + p.add_argument("--correlation-id", default=os.environ.get("GITHUB_RUN_ID")) + p.add_argument("--commit-sha", default=os.environ.get("GITHUB_SHA")) + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_run) + + p = sub.add_parser("start-analysis", help="Call /analysis/start on a running MicroVM") + p.add_argument("--microvm-identifier", required=True) + p.add_argument("--target-config", default=None) + p.add_argument("--port", type=int, default=8080) + p.add_argument("--token-expiration-minutes", type=int, default=15) + p.add_argument("--correlation-id", default=os.environ.get("GITHUB_RUN_ID")) + p.add_argument("--commit-sha", default=os.environ.get("GITHUB_SHA")) + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_start_analysis) + + p = sub.add_parser("fetch-results", help="Download report/events from a MicroVM and evaluate") + p.add_argument("--microvm-identifier", required=True) + p.add_argument("--output-dir", default="out") + p.add_argument("--port", type=int, default=8080) + p.add_argument("--token-expiration-minutes", type=int, default=15) + p.add_argument("--fail-on-severity", default="high") + p.add_argument("--fail-on-policy-violation", action=argparse.BooleanOptionalAction, default=True) + p.add_argument("--flow-log-group", default=None, help="CloudWatch log group for optional VPC Flow Logs correlation") + p.add_argument("--flow-log-eni", default=None, help="Restrict flow-log correlation to this ENI id") + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_fetch_results) + + p = sub.add_parser("terminate", help="Terminate a MicroVM") + p.add_argument("--microvm-identifier", required=True) + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_terminate) + + p = sub.add_parser("cleanup-stale", help="Terminate any non-terminated MicroVMs (safety net)") + _add_aws_safety_flags(p) + p.set_defaults(func=cmd_cleanup_stale) + return parser + + +def main(argv: Optional[List[str]] = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + return int(args.func(args)) + except SystemExit: + raise + except Exception as exc: # noqa: BLE001 + print(f"ERROR: {exc}", file=sys.stderr) + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/evaluate.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/evaluate.py new file mode 100644 index 000000000..02314078c --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/evaluate.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +SEVERITY_ORDER = { + "none": 0, + "informational": 0, + "low": 1, + "medium": 2, + "high": 3, + "critical": 4, +} + + +def severity_gte(left: str, right: str) -> bool: + return SEVERITY_ORDER.get(left, -1) >= SEVERITY_ORDER.get(right, -1) + + +def evaluate_report( + report: Dict[str, Any], + *, + fail_on_severity: str = "high", + fail_on_policy_violation: bool = True, +) -> Tuple[bool, List[str]]: + """Re-apply CI policy to a v0.2 report. Returns (ok, reasons). + + The supervisor already computed ``summary.status``; this lets CI re-evaluate + independently (e.g. with a stricter threshold) and surface human-readable + reasons. An ``error`` status always fails. + """ + reasons: List[str] = [] + summary = report.get("summary") or {} + + status = str(summary.get("status", "failed")) + if status == "error": + reasons.append("analysis status is 'error'") + return (False, reasons) + + verdict = str(summary.get("verdict", "unknown")) + max_severity = str(summary.get("maxSeverity", "none")) + + if severity_gte(max_severity, fail_on_severity): + reasons.append(f"max severity {max_severity!r} is >= threshold {fail_on_severity!r}") + + if fail_on_policy_violation and verdict == "policy_violation": + reasons.append("verdict is 'policy_violation'") + + if status == "failed": + # Honor the supervisor's own decision (e.g. fail-closed on collector error) + # even if the threshold check above didn't trip. + for reason in (report.get("policy") or {}).get("reasons", []): + if reason not in reasons: + reasons.append(str(reason)) + if not reasons: + reasons.append("report summary status is 'failed'") + + return (len(reasons) == 0, reasons) diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/flow_logs.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/flow_logs.py new file mode 100644 index 000000000..9246c11c2 --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/flow_logs.py @@ -0,0 +1,121 @@ +"""Optional VPC Flow Logs importer (AWS-side network-flow metadata). + +This is post-processing evidence, not a primary detector. It queries a +CloudWatch Logs group for VPC Flow Log records within the analysis time window +and attaches a metadata-only summary to a v0.2 report under +``report['flowLogs']``. Flow Logs carry 5-tuple + bytes/packets metadata only — +no payload and no DNS hostname attribution. + +It shells out to the AWS CLI (like the rest of the orchestrator) and degrades +gracefully: any failure leaves the report untouched and returns an empty result, +so local mode is never affected. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional +import json + +from .aws_cli import AwsCli, AwsCliError + + +@dataclass +class FlowLogResult: + available: bool + recordCount: int = 0 + flows: List[Dict[str, Any]] = field(default_factory=list) + note: str = "" + + def to_dict(self) -> Dict[str, Any]: + return { + "available": self.available, + "recordCount": self.recordCount, + "flows": self.flows, + "note": self.note, + "limitation": "metadata only (5-tuple + bytes/packets); no payload; no DNS hostname attribution", + } + + +def _to_epoch_ms(iso_ts: str) -> Optional[int]: + if not iso_ts: + return None + try: + cleaned = iso_ts.replace("Z", "+00:00") + dt = datetime.fromisoformat(cleaned) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return int(dt.timestamp() * 1000) + except ValueError: + return None + + +def fetch_flow_logs( + aws: AwsCli, + *, + log_group: str, + start_iso: str, + end_iso: str, + eni_id: Optional[str] = None, + limit: int = 200, +) -> FlowLogResult: + """Best-effort fetch of flow-log records in [start, end]. Never raises.""" + start_ms = _to_epoch_ms(start_iso) + end_ms = _to_epoch_ms(end_iso) + if start_ms is None or end_ms is None: + return FlowLogResult(available=False, note="missing/invalid analysis time window") + + args = [ + "logs", "filter-log-events", + "--log-group-name", log_group, + "--start-time", str(start_ms), + "--end-time", str(end_ms + 60_000), # flow logs lag; widen the tail + "--limit", str(limit), + ] + if eni_id: + args.extend(["--filter-pattern", eni_id]) + try: + resp = aws.run(args) + except AwsCliError as exc: + return FlowLogResult(available=False, note=f"flow log query failed: {exc}") + if not isinstance(resp, dict): + return FlowLogResult(available=False, note="unexpected flow log response") + + flows: List[Dict[str, Any]] = [] + for event in resp.get("events", []): + parsed = _parse_flow_record(str(event.get("message", ""))) + if parsed: + flows.append(parsed) + return FlowLogResult(available=True, recordCount=len(flows), flows=flows, + note="flow log metadata correlated by time window" + (f" and ENI {eni_id}" if eni_id else "")) + + +def _parse_flow_record(message: str) -> Optional[Dict[str, Any]]: + """Parse the default VPC Flow Logs v2 format. + + version account-id interface-id srcaddr dstaddr srcport dstport protocol + packets bytes start end action log-status + """ + cols = message.split() + if len(cols) < 14: + return None + try: + return { + "interfaceId": cols[2], + "srcAddr": cols[3], + "dstAddr": cols[4], + "srcPort": int(cols[5]) if cols[5].isdigit() else None, + "dstPort": int(cols[6]) if cols[6].isdigit() else None, + "protocol": cols[7], + "packets": int(cols[8]) if cols[8].isdigit() else None, + "bytes": int(cols[9]) if cols[9].isdigit() else None, + "action": cols[12], + } + except (ValueError, IndexError): + return None + + +def attach_flow_logs(report: Dict[str, Any], result: FlowLogResult) -> Dict[str, Any]: + """Attach flow-log metadata to a report (returns the same dict).""" + report["flowLogs"] = result.to_dict() + return report diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/http_client.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/http_client.py new file mode 100644 index 000000000..769538004 --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/http_client.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +from typing import Any, Dict, Optional +from urllib.error import HTTPError, URLError +from urllib.request import Request, urlopen +import json +import time + + +class EndpointError(RuntimeError): + pass + + +# Idempotent methods may be safely retried on a 5xx response. A non-idempotent +# POST (e.g. /analysis/start, which re-runs the whole scenario suite server-side) +# must NOT be retried on 5xx, or a single failure multiplies the work. +_IDEMPOTENT_METHODS = {"GET", "HEAD", "PUT", "DELETE", "OPTIONS"} +_RETRIABLE_STATUS = {429, 500, 502, 503, 504} + + +def request_json( + method: str, + url: str, + *, + token: Optional[str] = None, + port: int = 8080, + payload: Optional[Dict[str, Any]] = None, + timeout: float = 20.0, + retries: int = 3, + raw: bool = False, +): + """Make an authenticated request to a MicroVM endpoint. + + Returns the parsed JSON dict by default, or raw ``bytes`` when ``raw=True`` + (used for ``events.jsonl`` and other non-JSON artifacts). + """ + body = None if payload is None else json.dumps(payload).encode("utf-8") + accept = "application/octet-stream" if raw else "application/json" + headers = {"Accept": accept, "X-aws-proxy-port": str(port)} + if body is not None: + headers["Content-Type"] = "application/json" + if token: + headers["X-aws-proxy-auth"] = token + # 429 (throttling) is always safe to retry; other 5xx only for idempotent methods. + retriable_status = {429} if method.upper() not in _IDEMPOTENT_METHODS else _RETRIABLE_STATUS + last_error: Optional[BaseException] = None + for attempt in range(1, retries + 1): + try: + req = Request(url, data=body, headers=headers, method=method) + with urlopen(req, timeout=timeout) as resp: # nosec B310 - endpoint is Lambda MicroVM URL from AWS + raw_bytes = resp.read() + if raw: + return raw_bytes + decoded = raw_bytes.decode("utf-8") + return json.loads(decoded) if decoded else {} + except HTTPError as exc: + raw = exc.read().decode("utf-8", errors="replace") + if exc.code in retriable_status and attempt < retries: + last_error = exc + time.sleep(min(10.0, 2 ** attempt)) + continue + raise EndpointError(f"HTTP {exc.code} from {url}: {raw}") from exc + except (URLError, TimeoutError, OSError) as exc: + last_error = exc + if attempt < retries: + time.sleep(min(10.0, 2 ** attempt)) + continue + raise EndpointError(f"Request to {url} failed after {retries} attempts: {exc}") from exc + raise EndpointError(f"Request to {url} failed: {last_error}") diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/local.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/local.py new file mode 100644 index 000000000..dbdbe472b --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/local.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, Optional +import sys + + +def _repo_root() -> Path: + # Source-tree layout: /orchestrator/src/microvm_dta/local.py → parents[3]. + candidate = Path(__file__).resolve().parents[3] + if (candidate / "microvm").exists(): + return candidate + return Path.cwd() + + +def _load_microvm_modules(repo_root: Path) -> None: + microvm_dir = repo_root / "microvm" + if str(microvm_dir) not in sys.path: + sys.path.insert(0, str(microvm_dir)) + + +def run_local_analysis( + *, + target_config: str | Path, + root: str | Path, + rules_file: Optional[str | Path] = None, + correlation_id: Optional[str] = None, + commit_sha: Optional[str] = None, + allow_shell_strings: bool = False, +) -> Dict[str, Any]: + """Run a v0.2 DTA analysis locally (no AWS) and return the report dict. + + The MicroVM ``app`` package implements the supervisor; we put ``microvm/`` on + the path and call it directly so local and in-MicroVM behavior share code. + """ + repo_root = _repo_root() + _load_microvm_modules(repo_root) + from app.target_config import load_target_config # type: ignore + from app.supervisor import run_analysis # type: ignore + + rules = rules_file or (repo_root / "microvm" / "rules" / "default.yaml") + config = load_target_config(target_config, allow_shell_strings=allow_shell_strings) + result = run_analysis( + config, + root=root, + rules_file=rules, + source_base=Path(target_config).resolve().parent, + correlation_id=correlation_id, + commit_sha=commit_sha, + mode="local", + ) + return result.report diff --git a/lambda-microvm-dta/src/orchestrator/src/microvm_dta/packaging.py b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/packaging.py new file mode 100644 index 000000000..381327a67 --- /dev/null +++ b/lambda-microvm-dta/src/orchestrator/src/microvm_dta/packaging.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Iterable +import zipfile + +EXCLUDE_PARTS = {"__pycache__", ".pytest_cache", ".mypy_cache"} +EXCLUDE_NAMES = {".DS_Store"} +EXCLUDE_SUFFIXES = {".pyc", ".pyo"} + + +def should_include(path: Path) -> bool: + if any(part in EXCLUDE_PARTS for part in path.parts): + return False + if path.name in EXCLUDE_NAMES: + return False + if path.suffix in EXCLUDE_SUFFIXES: + return False + return True + + +def iter_files(source_dir: Path) -> Iterable[Path]: + for path in source_dir.rglob("*"): + if path.is_file() and should_include(path.relative_to(source_dir)): + yield path + + +def package_microvm(source_dir: str | Path, output: str | Path) -> Path: + source = Path(source_dir).resolve() + out = Path(output).resolve() + if not source.exists(): + raise FileNotFoundError(f"Source directory does not exist: {source}") + if not (source / "Dockerfile").exists(): + raise FileNotFoundError(f"Source directory must contain Dockerfile: {source}") + out.parent.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(out, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for file in iter_files(source): + zf.write(file, file.relative_to(source).as_posix()) + return out diff --git a/lambda-microvm-dta/variables.tf b/lambda-microvm-dta/variables.tf new file mode 100644 index 000000000..f4ae57648 --- /dev/null +++ b/lambda-microvm-dta/variables.tf @@ -0,0 +1,71 @@ +variable "region" { + description = "AWS region where Lambda MicroVMs is available." + type = string + default = "us-east-1" +} + +variable "project_name" { + description = "Name prefix for created resources." + type = string + default = "microvm-dta-sample" +} + +variable "artifact_bucket_name" { + description = "Optional explicit S3 bucket name. Empty lets AWS generate one." + type = string + default = "" +} + +variable "enable_github_oidc_role" { + description = "Create the optional GitHub Actions OIDC CI role (carrying the lambda:* MicroVM control-plane policy). Off by default; the core pattern only needs the build/execution roles + artifact bucket, so `terraform apply` works with no required inputs." + type = bool + default = false +} + +variable "github_org" { + description = "GitHub org or user that owns the repository (only used when enable_github_oidc_role = true)." + type = string + default = "YOUR_GITHUB_ORG" +} + +variable "github_repo" { + description = "GitHub repository name (for OIDC trust)." + type = string + default = "aws-lambda-microvm-dta-sample" +} + +variable "github_branch" { + description = "Branch allowed to assume the CI role via OIDC." + type = string + default = "main" +} + +variable "create_github_oidc_provider" { + description = "Create the GitHub Actions OIDC provider. Set false if it already exists in the account." + type = bool + default = true +} + +variable "enable_vpc_egress" { + description = "Create a VPC + restricted egress for the production-oriented network profile." + type = bool + default = false +} + +variable "vpc_cidr" { + description = "CIDR for the egress VPC (only used when enable_vpc_egress = true)." + type = string + default = "10.20.0.0/16" +} + +variable "enable_vpc_flow_logs" { + description = "Enable VPC Flow Logs to CloudWatch for network-flow evidence (requires enable_vpc_egress)." + type = bool + default = false +} + +variable "tags" { + description = "Extra tags applied to all resources." + type = map(string) + default = {} +} diff --git a/lambda-microvm-dta/versions.tf b/lambda-microvm-dta/versions.tf new file mode 100644 index 000000000..882805e25 --- /dev/null +++ b/lambda-microvm-dta/versions.tf @@ -0,0 +1,14 @@ +terraform { + required_version = ">= 1.5" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.40" + } + } +} + +provider "aws" { + region = var.region +} diff --git a/lambda-microvm-dta/vpc_egress.tf b/lambda-microvm-dta/vpc_egress.tf new file mode 100644 index 000000000..a6708cf0e --- /dev/null +++ b/lambda-microvm-dta/vpc_egress.tf @@ -0,0 +1,103 @@ +# Optional production-oriented network profile: a VPC with a private subnet whose +# egress is constrained (no internet gateway on the private subnet) and optional +# VPC Flow Logs to CloudWatch for network-flow evidence. Enable with +# enable_vpc_egress = true. This is a reference for routing MicroVM egress through +# a controlled VPC connector rather than AWS-managed INTERNET_EGRESS. + +resource "aws_vpc" "egress" { + count = var.enable_vpc_egress ? 1 : 0 + cidr_block = var.vpc_cidr + enable_dns_support = true + enable_dns_hostnames = true + tags = merge(local.common_tags, { Name = "${var.project_name}-egress" }) +} + +resource "aws_subnet" "private" { + count = var.enable_vpc_egress ? 1 : 0 + vpc_id = aws_vpc.egress[0].id + cidr_block = cidrsubnet(var.vpc_cidr, 8, 1) + availability_zone = data.aws_availability_zones.available[0].names[0] + tags = merge(local.common_tags, { Name = "${var.project_name}-private" }) +} + +data "aws_availability_zones" "available" { + count = var.enable_vpc_egress ? 1 : 0 + state = "available" +} + +# Restrictive security group: no inbound; egress only to HTTPS by default. Tighten +# the cidr_blocks / add a prefix list to reach only an allowlisted canary endpoint. +resource "aws_security_group" "egress" { + count = var.enable_vpc_egress ? 1 : 0 + name = "${var.project_name}-egress" + description = "Restricted egress for MicroVM analysis traffic" + vpc_id = aws_vpc.egress[0].id + + egress { + description = "HTTPS egress (replace 0.0.0.0/0 with an allowlisted canary CIDR/prefix list)" + from_port = 443 + to_port = 443 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + } + + tags = merge(local.common_tags, { Name = "${var.project_name}-egress" }) +} + +# --- Optional VPC Flow Logs to CloudWatch (network-flow metadata evidence) --- +resource "aws_cloudwatch_log_group" "flow_logs" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + name = "/${var.project_name}/vpc-flow-logs" + retention_in_days = 14 + tags = local.common_tags +} + +data "aws_iam_policy_document" "flow_logs_trust" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + statement { + effect = "Allow" + actions = ["sts:AssumeRole"] + principals { + type = "Service" + identifiers = ["vpc-flow-logs.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "flow_logs" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + name = "${var.project_name}-flow-logs-role" + assume_role_policy = data.aws_iam_policy_document.flow_logs_trust[0].json + tags = local.common_tags +} + +data "aws_iam_policy_document" "flow_logs" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + statement { + effect = "Allow" + actions = [ + "logs:CreateLogStream", + "logs:PutLogEvents", + "logs:DescribeLogGroups", + "logs:DescribeLogStreams", + ] + resources = ["${aws_cloudwatch_log_group.flow_logs[0].arn}:*"] + } +} + +resource "aws_iam_role_policy" "flow_logs" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + name = "flow-logs-policy" + role = aws_iam_role.flow_logs[0].id + policy = data.aws_iam_policy_document.flow_logs[0].json +} + +resource "aws_flow_log" "vpc" { + count = var.enable_vpc_egress && var.enable_vpc_flow_logs ? 1 : 0 + iam_role_arn = aws_iam_role.flow_logs[0].arn + log_destination = aws_cloudwatch_log_group.flow_logs[0].arn + traffic_type = "ALL" + vpc_id = aws_vpc.egress[0].id + max_aggregation_interval = 60 + tags = local.common_tags +}