Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,13 @@ void simpleSteps_producesUnifiedTraceInXRay() throws Exception {

// Find the trace that contains our durable spans
var durableTrace = allTraces.stream()
.filter(trace ->
trace.segments().stream().anyMatch(seg -> segmentContains(seg, "durable.step:create-greeting")))
.filter(trace -> trace.segments().stream().anyMatch(seg -> segmentContains(seg, "create-greeting")))
.findFirst()
.orElse(null);

assertNotNull(
durableTrace,
"Expected to find a trace with durable.step:create-greeting segment. " + "Found " + traces.size()
"Expected to find a trace with create-greeting segment. " + "Found " + traces.size()
+ " traces in the time window. Segment names: "
+ allTraces.stream()
.flatMap(t -> t.segments().stream())
Expand All @@ -165,12 +164,10 @@ void simpleSteps_producesUnifiedTraceInXRay() throws Exception {

// Verify expected span names appear in the trace
assertTrue(
allSegmentText.contains("durable.invocation"),
"Expected durable.invocation span in trace. Segments: " + summarizeSegments(segmentDocuments));
assertTrue(
allSegmentText.contains("durable.step:create-greeting"),
"Expected durable.step:create-greeting span in trace");
assertTrue(allSegmentText.contains("durable.step:transform"), "Expected durable.step:transform span in trace");
allSegmentText.contains("invocation"),
"Expected invocation span in trace. Segments: " + summarizeSegments(segmentDocuments));
assertTrue(allSegmentText.contains("create-greeting"), "Expected create-greeting span in trace");
assertTrue(allSegmentText.contains("transform"), "Expected transform span in trace");

// Verify all segments share the same trace ID (single unified trace)
var uniqueTraceIds =
Expand Down Expand Up @@ -218,14 +215,13 @@ void waitAndResume_producesUnifiedTraceAcrossInvocations() throws Exception {

// Find the trace containing our durable spans
var durableTrace = allTraces.stream()
.filter(trace ->
trace.segments().stream().anyMatch(seg -> segmentContains(seg, "durable.step:before-wait")))
.filter(trace -> trace.segments().stream().anyMatch(seg -> segmentContains(seg, "before-wait")))
.findFirst()
.orElse(null);

assertNotNull(
durableTrace,
"Expected to find a trace with durable.step:before-wait segment. " + "Found " + traces.size()
"Expected to find a trace with before-wait segment. " + "Found " + traces.size()
+ " traces in the time window.");

// 4. Verify multi-invocation trace structure
Expand All @@ -234,14 +230,12 @@ void waitAndResume_producesUnifiedTraceAcrossInvocations() throws Exception {
var allSegmentText = String.join("\n", segmentDocuments);

// Verify spans from BOTH invocations appear in the same trace
assertTrue(
allSegmentText.contains("durable.step:before-wait"), "Expected before-wait span from first invocation");
assertTrue(
allSegmentText.contains("durable.step:after-wait"), "Expected after-wait span from second invocation");
assertTrue(allSegmentText.contains("durable.wait:pause"), "Expected wait:pause span in trace");
assertTrue(allSegmentText.contains("before-wait"), "Expected before-wait span from first invocation");
assertTrue(allSegmentText.contains("after-wait"), "Expected after-wait span from second invocation");
assertTrue(allSegmentText.contains("pause"), "Expected wait:pause span in trace");

// Verify multiple invocation spans (one per Lambda invocation)
var invocationCount = countOccurrences(allSegmentText, "durable.invocation");
var invocationCount = countOccurrences(allSegmentText, "invocation");
assertTrue(
invocationCount >= 2,
"Expected at least 2 invocation spans (multi-invocation), got " + invocationCount);
Expand All @@ -264,7 +258,9 @@ private List<TraceSummary> queryTracesWithRetry(Instant startTime, Instant endTi
throws InterruptedException {
// Query by durable.invocation service — our spans are in a separate trace from Lambda's
// built-in X-Ray segment (durable backend propagates its own trace root)
var filterExpression = "service(\"durable.invocation\")";
// Filter by the Lambda function's service name — each function has a unique one.
// This avoids picking up traces from other durable functions that share service.name="invocation".
var filterExpression = "service(\"" + functionName + functionNameSuffix + "\")";
for (int attempt = 0; attempt < XRAY_QUERY_RETRIES; attempt++) {
var response = xrayClient.getTraceSummaries(GetTraceSummariesRequest.builder()
.startTime(startTime)
Expand Down
6 changes: 0 additions & 6 deletions examples/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,6 @@ Resources:
- !Sub
- arn:aws:lambda:${AWS::Region}:901920570463:layer:aws-otel-java-agent-${AdotArch}-ver-1-32-0:6
- AdotArch: amd64
Environment:
Variables:
AWS_LAMBDA_EXEC_WRAPPER: /opt/otel-handler

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come we don't need this anymore?


OtelXRayWaitExampleFunction:
Type: AWS::Serverless::Function
Expand All @@ -389,9 +386,6 @@ Resources:
- !Sub
- arn:aws:lambda:${AWS::Region}:901920570463:layer:aws-otel-java-agent-${AdotArch}-ver-1-32-0:6
- AdotArch: amd64
Environment:
Variables:
AWS_LAMBDA_EXEC_WRAPPER: /opt/otel-handler

RetryInvokeExampleFunction:
Type: AWS::Serverless::Function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import static software.amazon.lambda.durable.otel.SpanAttributes.*;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.time.Instant;
Expand Down Expand Up @@ -128,6 +131,12 @@ public OtelPlugin(SdkTracerProviderBuilder tracerProviderBuilder, ContextExtract
public OtelPlugin(
SdkTracerProviderBuilder tracerProviderBuilder, ContextExtractor contextExtractor, boolean enableMdc) {
this.idGenerator = new DeterministicIdGenerator();

// Set service.name to "invocation" — X-Ray uses this as the display name for SERVER spans,
// creating a separate service node in the trace map labeled "invocation".
var resource = Resource.create(Attributes.of(AttributeKey.stringKey("service.name"), "invocation"));
tracerProviderBuilder.setResource(resource);

this.tracerProvider = tracerProviderBuilder.setIdGenerator(idGenerator).build();
this.tracer = tracerProvider.get(INSTRUMENTATION_NAME);
this.contextExtractor = contextExtractor;
Expand All @@ -152,10 +161,17 @@ public void onInvocationStart(InvocationInfo info) {
}
// If no extracted context, idGenerator falls back to ARN-derived trace ID

// Determine parent context for the invocation span
// Determine parent context for the invocation span.
// When the X-Ray header has a Parent field, create the invocation span as a
// child of that segment. This connects our OTLP-exported spans to the Lambda
// service's X-Ray segments.
Context parentContext;
if (extractedContext != null && extractedContext.parentSpanId() != null) {
// Create a remote parent span context from the X-Ray Parent field
var activeSpan = Span.fromContext(Context.current());
if (activeSpan.getSpanContext().isValid()) {
// An auto-instrumenter (e.g., ADOT agent) is active — parent to its span
parentContext = Context.current();
} else if (extractedContext != null && extractedContext.parentSpanId() != null) {
// No active instrumenter, but X-Ray header has parent — create remote parent
var parentSpanContext = SpanContext.createFromRemoteParent(
extractedContext.traceId(),
extractedContext.parentSpanId(),
Expand All @@ -166,8 +182,10 @@ public void onInvocationStart(InvocationInfo info) {
parentContext = Context.root();
}

// Create invocation span as child of Lambda's X-Ray segment (via Parent field)
var spanBuilder = tracer.spanBuilder("durable.invocation")
// Create a SERVER span to establish a separate X-Ray service node.
// X-Ray uses service.name for the segment display name.
var spanBuilder = tracer.spanBuilder("invocation")
.setSpanKind(SpanKind.SERVER)
.setParent(parentContext)
.setAttribute(DURABLE_EXECUTION_ARN, info.durableExecutionArn())
.setAttribute(DURABLE_FIRST_INVOCATION, info.isFirstInvocation());
Expand Down Expand Up @@ -227,8 +245,6 @@ public void onInvocationEnd(InvocationEndInfo info) {
public void onOperationStart(OperationInfo info) {
if (info.id() == null) return;

idGenerator.setNextSpanOperationId(info.id());

var parentContext = resolveParentContext(info.parentId());

var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name()))
Expand All @@ -237,6 +253,19 @@ public void onOperationStart(OperationInfo info) {
.setAttribute(DURABLE_OPERATION_ID, info.id())
.setAttribute(DURABLE_OPERATION_TYPE, info.type());

if (info.isContinuation()) {
// Operation was already started in a prior invocation — use a random span ID
// and add a Link to the deterministic span from the original invocation for correlation.
var deterministicSpanId = idGenerator.generateSpanIdForOperation(info.id());
var traceId = idGenerator.generateTraceId();
var linkedSpanContext =
SpanContext.create(traceId, deterministicSpanId, TraceFlags.getSampled(), TraceState.getDefault());
spanBuilder.addLink(linkedSpanContext);
} else {
// First execution — use deterministic span ID so continuations can link back
idGenerator.setNextSpanOperationId(info.id());
}

if (info.name() != null) {
spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name());
}
Expand Down Expand Up @@ -400,17 +429,17 @@ private Context resolveParentContext(String parentId) {

private static String spanName(String type, String subType, String name) {
if (name != null) {
return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase()) + ":" + name;
return name;
}
return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase());
return subType != null ? subType.toLowerCase() : type.toLowerCase();
}

private static String attemptSpanName(String type, String subType, String name, Integer attempt) {
var base = spanName(type, subType, name);
if (attempt != null) {
return base + " [attempt " + attempt + "]";
return base + " attempt " + attempt;
}
return base + " [fn]";
return base;
}

private static String attemptKey(String operationId, Integer attempt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@ public class XRayContextExtractor implements ContextExtractor {

private static final Logger logger = LoggerFactory.getLogger(XRayContextExtractor.class);
private static final String XRAY_ENV_VAR = "_X_AMZN_TRACE_ID";
private static final String XRAY_SYSTEM_PROPERTY = "com.amazonaws.xray.traceHeader";
private static final Pattern HEX_32 = Pattern.compile("[0-9a-f]{32}");
private static final Pattern HEX_16 = Pattern.compile("[0-9a-f]{16}");

@Override
public ExtractedContext extract() {
var traceHeader = System.getenv(XRAY_ENV_VAR);
// Try system property first — Lambda runtime updates this per invocation
// and it avoids JVM environment variable caching issues.
var traceHeader = System.getProperty(XRAY_SYSTEM_PROPERTY);
if (traceHeader == null || traceHeader.isEmpty()) {
logger.debug("No X-Ray trace header found in environment");
// Fallback to environment variable
traceHeader = System.getenv(XRAY_ENV_VAR);
}
if (traceHeader == null || traceHeader.isEmpty()) {
logger.debug("No X-Ray trace header found in environment or system properties");
return null;
}

Expand Down
Loading
Loading