diff --git a/examples/pom.xml b/examples/pom.xml index 37f39bbbb..5848e5341 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -31,6 +31,25 @@ ${project.version} + + + software.amazon.lambda.durable + aws-durable-execution-sdk-java-otel + ${project.version} + + + + + io.opentelemetry + opentelemetry-sdk + 1.62.0 + + + io.opentelemetry + opentelemetry-exporter-logging + 1.62.0 + + com.amazonaws diff --git a/examples/src/main/java/software/amazon/lambda/durable/examples/general/OtelExample.java b/examples/src/main/java/software/amazon/lambda/durable/examples/general/OtelExample.java new file mode 100644 index 000000000..3567501c1 --- /dev/null +++ b/examples/src/main/java/software/amazon/lambda/durable/examples/general/OtelExample.java @@ -0,0 +1,68 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.general; + +import io.opentelemetry.exporter.logging.LoggingSpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import software.amazon.lambda.durable.DurableConfig; +import software.amazon.lambda.durable.DurableContext; +import software.amazon.lambda.durable.DurableHandler; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.otel.DeterministicIdGenerator; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; + +/** + * Example demonstrating OpenTelemetry instrumentation with the Durable Execution SDK. + * + *

This handler configures the OTel plugin with: + * + *

+ * + *

In production, replace {@code LoggingSpanExporter} with {@code OtlpGrpcSpanExporter} to send spans to an OTLP + * collector (X-Ray, Datadog, etc.). + * + *

Expected trace structure: + * + *

+ * durable.invocation
+ * ├── durable.step:create-greeting [attempt 1]
+ * ├── durable.step:create-greeting (operation, backfilled)
+ * ├── durable.step:transform [attempt 1]
+ * └── durable.step:transform (operation, backfilled)
+ * 
+ */ +public class OtelExample extends DurableHandler { + + @Override + protected DurableConfig createConfiguration() { + var idGenerator = new DeterministicIdGenerator(); + var tracerProvider = SdkTracerProvider.builder() + .setIdGenerator(idGenerator) + .addSpanProcessor(SimpleSpanProcessor.create(LoggingSpanExporter.create())) + .build(); + var otelPlugin = new OpenTelemetryDurablePlugin(tracerProvider, idGenerator); + + return DurableConfig.builder().withPlugins(otelPlugin).build(); + } + + @Override + public String handleRequest(GreetingRequest input, DurableContext context) { + // Log with MDC — trace_id and span_id will be in the JSON output + context.getLogger().info("Starting OTel example for {}", input.getName()); + + var greeting = context.step("create-greeting", String.class, stepCtx -> { + context.getLogger().info("Inside step — this log has trace context in MDC"); + return "Hello, " + input.getName(); + }); + + var result = context.step("transform", String.class, stepCtx -> greeting.toUpperCase() + "!"); + + context.getLogger().info("OTel example complete: {}", result); + return result; + } +} diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java index a32295b40..fa4f0eb44 100644 --- a/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/CloudBasedIntegrationTest.java @@ -847,4 +847,17 @@ void testPluginExample() { assertNotNull(runner.getOperation("create-greeting")); assertNotNull(runner.getOperation("transform")); } + + @Test + void testOtelExample() { + var runner = + CloudDurableTestRunner.create(arn("otel-example"), GreetingRequest.class, String.class, lambdaClient); + var result = runner.run(new GreetingRequest("World")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, WORLD!", result.getResult()); + + assertNotNull(runner.getOperation("create-greeting")); + assertNotNull(runner.getOperation("transform")); + } } diff --git a/examples/src/test/java/software/amazon/lambda/durable/examples/general/OtelExampleTest.java b/examples/src/test/java/software/amazon/lambda/durable/examples/general/OtelExampleTest.java new file mode 100644 index 000000000..629fa3b85 --- /dev/null +++ b/examples/src/test/java/software/amazon/lambda/durable/examples/general/OtelExampleTest.java @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.examples.general; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.examples.types.GreetingRequest; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +class OtelExampleTest { + + @Test + void testOtelExample_executesSuccessfully() { + var handler = new OtelExample(); + var runner = LocalDurableTestRunner.create(GreetingRequest.class, handler); + + var result = runner.run(new GreetingRequest("World")); + + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + assertEquals("HELLO, WORLD!", result.getResult(String.class)); + + assertNotNull(result.getOperation("create-greeting")); + assertNotNull(result.getOperation("transform")); + } +} diff --git a/examples/template.yaml b/examples/template.yaml index 2f5735c86..10a27c46a 100644 --- a/examples/template.yaml +++ b/examples/template.yaml @@ -344,6 +344,17 @@ Resources: Handler: "software.amazon.lambda.durable.examples.general.PluginExample" Role: !Ref RoleArn + OtelExampleFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: !Join + - '-' + - - 'otel-example' + - !Ref JavaVersion + - runtime + Handler: "software.amazon.lambda.durable.examples.general.OtelExample" + Role: !Ref RoleArn + RetryInvokeExampleFunction: Type: AWS::Serverless::Function Properties: @@ -546,6 +557,10 @@ Outputs: Description: Plugin Example Function ARN Value: !GetAtt PluginExampleFunction.Arn + OtelExampleFunction: + Description: OTel Example Function ARN + Value: !GetAtt OtelExampleFunction.Arn + RetryInvokeExampleFunction: Description: Retry Invoke Example Function ARN Value: !GetAtt RetryInvokeExampleFunction.Arn diff --git a/otel-plugin/pom.xml b/otel-plugin/pom.xml new file mode 100644 index 000000000..7018b250a --- /dev/null +++ b/otel-plugin/pom.xml @@ -0,0 +1,105 @@ + + + 4.0.0 + + + software.amazon.lambda.durable + aws-durable-execution-sdk-java-parent + 1.1.1-SNAPSHOT + + + aws-durable-execution-sdk-java-otel + AWS Lambda Durable Execution SDK - OpenTelemetry Plugin + OpenTelemetry instrumentation plugin for AWS Lambda Durable Execution SDK + + + 1.62.0 + + + + + + software.amazon.lambda.durable + aws-durable-execution-sdk-java + ${project.version} + + + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + provided + + + + + io.opentelemetry + opentelemetry-context + ${opentelemetry.version} + + + + + io.opentelemetry.contrib + opentelemetry-aws-xray-propagator + 1.56.0-alpha + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter + test + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + test + + + org.mockito + mockito-core + test + + + ch.qos.logback + logback-classic + 1.5.18 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-surefire-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + + diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/ContextExtractor.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/ContextExtractor.java new file mode 100644 index 000000000..518569c3e --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/ContextExtractor.java @@ -0,0 +1,27 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import io.opentelemetry.context.Context; + +/** + * Extracts OTel trace context from the Lambda runtime environment. + * + *

Implementations read trace context from various sources (X-Ray trace header, W3C traceparent, etc.) and return an + * OTel {@link Context} that can be used as the parent for invocation spans. + * + *

Called once per invocation in {@code onInvocationStart} to establish the parent trace context. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +@FunctionalInterface +public interface ContextExtractor { + + /** + * Extracts trace context from the runtime environment. + * + * @return the extracted OTel context, or {@link Context#root()} if no context is available + */ + Context extract(); +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/DeterministicIdGenerator.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/DeterministicIdGenerator.java new file mode 100644 index 000000000..71dff119f --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/DeterministicIdGenerator.java @@ -0,0 +1,119 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import io.opentelemetry.sdk.trace.IdGenerator; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Generates deterministic trace and span IDs for durable execution observability. + * + *

All invocations of the same execution share a single trace ID (derived from the execution ARN). Operations get + * stable span IDs derived from the execution ARN + operation ID, ensuring the same operation produces the same span + * across invocations. + * + *

When no pending operation ID is set, falls back to random generation (standard OTel behavior). + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +public class DeterministicIdGenerator implements IdGenerator { + + private static final IdGenerator RANDOM = IdGenerator.random(); + + private final AtomicReference executionTraceId = new AtomicReference<>(null); + private final ThreadLocal pendingSpanOperationId = new ThreadLocal<>(); + private final AtomicReference executionArn = new AtomicReference<>(null); + + /** + * Sets the execution ARN used for generating deterministic IDs. + * + * @param arn the durable execution ARN + */ + public void setExecutionArn(String arn) { + this.executionArn.set(arn); + this.executionTraceId.set(generateTraceIdFromArn(arn)); + } + + /** + * Queues the next span to use a deterministic ID derived from the given operation ID. + * + * @param operationId the operation ID to derive the span ID from + */ + public void setNextSpanOperationId(String operationId) { + this.pendingSpanOperationId.set(operationId); + } + + /** + * Generates a deterministic span ID for a given operation ID without consuming the ThreadLocal state. + * + *

Used for creating non-recording placeholder spans when a parent operation's span context is needed but hasn't + * been exported yet. + * + * @param operationId the operation ID to derive the span ID from + * @return a deterministic 16-char hex span ID + */ + public String generateSpanIdForOperation(String operationId) { + return generateSpanIdFromOperation(operationId); + } + + @Override + public String generateTraceId() { + var cached = executionTraceId.get(); + if (cached != null) { + return cached; + } + return RANDOM.generateTraceId(); + } + + @Override + public String generateSpanId() { + var operationId = pendingSpanOperationId.get(); + if (operationId != null) { + pendingSpanOperationId.remove(); + return generateSpanIdFromOperation(operationId); + } + return RANDOM.generateSpanId(); + } + + /** + * Generates a deterministic trace ID from an execution ARN. + * + *

Uses SHA-256 hash truncated to 32 hex chars (128 bits) for the trace ID. + */ + private String generateTraceIdFromArn(String arn) { + var hash = sha256(arn); + // Trace ID is 32 hex chars (16 bytes) + return hash.substring(0, 32); + } + + /** + * Generates a deterministic span ID from the execution ARN + operation ID. + * + *

Uses SHA-256 hash truncated to 16 hex chars (64 bits) for the span ID. + */ + private String generateSpanIdFromOperation(String operationId) { + var arn = executionArn.get(); + var input = arn != null ? arn + ":" + operationId : operationId; + var hash = sha256(input); + // Span ID is 16 hex chars (8 bytes) + return hash.substring(0, 16); + } + + private static String sha256(String input) { + try { + var digest = MessageDigest.getInstance("SHA-256"); + var hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); + var hex = new StringBuilder(64); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 not available", e); + } + } +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/MdcSpanEnricher.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/MdcSpanEnricher.java new file mode 100644 index 000000000..1d90fe37d --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/MdcSpanEnricher.java @@ -0,0 +1,59 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import io.opentelemetry.api.trace.Span; +import org.slf4j.MDC; + +/** + * Injects OTel trace/span IDs into SLF4J MDC for log-trace correlation. + * + *

When used with structured logging (Log4j2 JSON, Logback JSON), these MDC fields appear in every log line, enabling + * tools like CloudWatch Logs Insights and Datadog to correlate logs with traces. + * + *

MDC keys injected: + * + *

+ * + *

Usage: Call {@link #inject()} in {@code onUserFunctionStart} (after span is active) and {@link #clear()} in + * {@code onUserFunctionEnd}. Or use the convenience plugin {@link OpenTelemetryDurablePlugin} which handles this + * automatically when MDC enrichment is enabled. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +public final class MdcSpanEnricher { + + public static final String MDC_TRACE_ID = "trace_id"; + public static final String MDC_SPAN_ID = "span_id"; + public static final String MDC_EXECUTION_ARN = "durable.execution.arn"; + + private MdcSpanEnricher() {} + + /** + * Injects the current span's trace ID and span ID into MDC. + * + * @param executionArn the durable execution ARN (may be null) + */ + public static void inject(String executionArn) { + var span = Span.current(); + if (span.getSpanContext().isValid()) { + MDC.put(MDC_TRACE_ID, span.getSpanContext().getTraceId()); + MDC.put(MDC_SPAN_ID, span.getSpanContext().getSpanId()); + } + if (executionArn != null) { + MDC.put(MDC_EXECUTION_ARN, executionArn); + } + } + + /** Removes the injected MDC fields. */ + public static void clear() { + MDC.remove(MDC_TRACE_ID); + MDC.remove(MDC_SPAN_ID); + MDC.remove(MDC_EXECUTION_ARN); + } +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java new file mode 100644 index 000000000..0e9e93ade --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePlugin.java @@ -0,0 +1,371 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static software.amazon.lambda.durable.otel.SpanAttributes.*; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.trace.TracerProvider; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.time.Instant; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.lambda.durable.plugin.DurableExecutionPlugin; +import software.amazon.lambda.durable.plugin.InvocationEndInfo; +import software.amazon.lambda.durable.plugin.InvocationInfo; +import software.amazon.lambda.durable.plugin.InvocationStatus; +import software.amazon.lambda.durable.plugin.OperationEndInfo; +import software.amazon.lambda.durable.plugin.OperationInfo; +import software.amazon.lambda.durable.plugin.UserFunctionEndInfo; +import software.amazon.lambda.durable.plugin.UserFunctionStartInfo; + +/** + * OpenTelemetry plugin for the AWS Lambda Durable Execution SDK. + * + *

Creates spans at three levels: + * + *

+ * + *

Uses deterministic span/trace IDs so all invocations of the same execution share a single trace. + * + *

Thread-safe: uses {@link ConcurrentHashMap} for span/scope storage since the SDK runs user code on multiple + * threads. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +public class OpenTelemetryDurablePlugin implements DurableExecutionPlugin { + + private static final Logger logger = LoggerFactory.getLogger(OpenTelemetryDurablePlugin.class); + private static final String INSTRUMENTATION_NAME = "aws-durable-execution-sdk-java"; + + private final TracerProvider tracerProvider; + private final Tracer tracer; + private final DeterministicIdGenerator idGenerator; + private final ContextExtractor contextExtractor; + private final double samplingRate; + private final boolean enableMdc; + + // Per-invocation state + private volatile Span invocationSpan; + private volatile String executionArn; + private volatile boolean sampled = true; + + // Thread-safe storage for operation spans (keyed by operationId) — open spans that need ending + private final ConcurrentHashMap operationSpans = new ConcurrentHashMap<>(); + + // Thread-safe storage for attempt spans/scopes (keyed by operationId + "-" + attempt) + private final ConcurrentHashMap attemptSpans = new ConcurrentHashMap<>(); + private final ConcurrentHashMap attemptScopes = new ConcurrentHashMap<>(); + + // Store operation span contexts for parent resolution (keyed by operationId) + private final ConcurrentHashMap operationContexts = new ConcurrentHashMap<>(); + + /** + * Creates an OTel plugin with default settings: X-Ray context extraction, 100% sampling, MDC enabled. + * + * @param tracerProvider the OTel tracer provider (should use {@link DeterministicIdGenerator}) + * @param idGenerator the deterministic ID generator (same instance configured in the tracer provider) + */ + public OpenTelemetryDurablePlugin(TracerProvider tracerProvider, DeterministicIdGenerator idGenerator) { + this(tracerProvider, idGenerator, new XRayContextExtractor(), 1.0, true); + } + + /** + * Creates an OTel plugin with a custom context extractor and sampling rate, MDC enabled. + * + * @param tracerProvider the OTel tracer provider + * @param idGenerator the deterministic ID generator + * @param contextExtractor extracts parent trace context from the Lambda environment + * @param samplingRate value between 0.0 and 1.0 — fraction of executions to trace + */ + public OpenTelemetryDurablePlugin( + TracerProvider tracerProvider, + DeterministicIdGenerator idGenerator, + ContextExtractor contextExtractor, + double samplingRate) { + this(tracerProvider, idGenerator, contextExtractor, samplingRate, true); + } + + /** + * Creates an OTel plugin with full configuration. + * + * @param tracerProvider the OTel tracer provider + * @param idGenerator the deterministic ID generator + * @param contextExtractor extracts parent trace context from the Lambda environment + * @param samplingRate value between 0.0 and 1.0 — fraction of executions to trace + * @param enableMdc if true, injects trace_id/span_id into SLF4J MDC for log correlation + */ + public OpenTelemetryDurablePlugin( + TracerProvider tracerProvider, + DeterministicIdGenerator idGenerator, + ContextExtractor contextExtractor, + double samplingRate, + boolean enableMdc) { + this.tracerProvider = tracerProvider; + this.idGenerator = idGenerator; + this.tracer = tracerProvider.get(INSTRUMENTATION_NAME); + this.contextExtractor = contextExtractor; + this.samplingRate = samplingRate; + this.enableMdc = enableMdc; + } + + /** + * Creates an OTel plugin using a pre-configured {@link SdkTracerProvider}. + * + * @param sdkTracerProvider the SDK tracer provider + * @param idGenerator the deterministic ID generator + */ + public OpenTelemetryDurablePlugin(SdkTracerProvider sdkTracerProvider, DeterministicIdGenerator idGenerator) { + this((TracerProvider) sdkTracerProvider, idGenerator); + } + + // ─── Invocation hooks ──────────────────────────────────────────────── + + @Override + public void onInvocationStart(InvocationInfo info) { + this.executionArn = info.executionArn(); + idGenerator.setExecutionArn(info.executionArn()); + + // Determine sampling (consistent across all invocations of same execution) + this.sampled = SamplingUtil.shouldSampleExecution(info.executionArn(), samplingRate); + if (!sampled) return; + + // Extract parent context from Lambda environment (X-Ray, W3C, etc.) + var extractedParentContext = contextExtractor.extract(); + + // Create invocation span as child of extracted context + var spanBuilder = tracer.spanBuilder("durable.invocation") + .setParent(extractedParentContext) + .setAttribute(DURABLE_EXECUTION_ARN, info.executionArn()) + .setAttribute(DURABLE_FIRST_INVOCATION, info.isFirstInvocation()); + + if (info.requestId() != null) { + spanBuilder.setAttribute(AttributeKey.stringKey("faas.invocation_id"), info.requestId()); + } + + invocationSpan = spanBuilder.startSpan(); + } + + @Override + public void onInvocationEnd(InvocationEndInfo info) { + if (!sampled || invocationSpan == null) return; + + // End any operation spans that are still open (operations that didn't complete in this invocation) + for (var entry : operationSpans.entrySet()) { + var span = entry.getValue(); + span.setAttribute(AttributeKey.stringKey("durable.operation.status"), "PENDING"); + span.end(); + } + operationSpans.clear(); + operationContexts.clear(); + + // End any attempt spans that are still open (e.g., SuspendExecutionException skipped onUserFunctionEnd) + for (var entry : attemptScopes.entrySet()) { + entry.getValue().close(); + } + attemptScopes.clear(); + for (var entry : attemptSpans.entrySet()) { + entry.getValue().end(); + } + attemptSpans.clear(); + + // End invocation span + invocationSpan.setAttribute( + DURABLE_INVOCATION_STATUS, info.invocationStatus().name()); + + if (info.invocationStatus() == InvocationStatus.FAILED && info.executionError() != null) { + invocationSpan.setStatus(StatusCode.ERROR, info.executionError().getMessage()); + invocationSpan.recordException(info.executionError()); + } + + invocationSpan.end(); + invocationSpan = null; + + // Flush spans before Lambda freezes + if (tracerProvider instanceof SdkTracerProvider sdkProvider) { + var flushResult = sdkProvider.forceFlush().join(5, java.util.concurrent.TimeUnit.SECONDS); + if (!flushResult.isSuccess()) { + logger.warn("OTel span flush failed or timed out — some spans may be lost"); + } + } + } + + // ─── Operation hooks ───────────────────────────────────────────────── + + @Override + public void onOperationStart(OperationInfo info) { + if (!sampled || info.id() == null) return; + + idGenerator.setNextSpanOperationId(info.id()); + + var parentContext = resolveParentContext(info.parentId()); + + var spanBuilder = tracer.spanBuilder(spanName(info.type(), info.subType(), info.name())) + .setParent(parentContext) + .setAttribute(DURABLE_EXECUTION_ARN, executionArn) + .setAttribute(DURABLE_OPERATION_ID, info.id()) + .setAttribute(DURABLE_OPERATION_TYPE, info.type()); + + if (info.name() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name()); + } + if (info.subType() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_SUBTYPE, info.subType()); + } + if (info.parentId() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_PARENT_ID, info.parentId()); + } + + var span = spanBuilder.startSpan(); + + // Store the open span — will be ended in onOperationEnd or onInvocationEnd + operationSpans.put(info.id(), span); + operationContexts.put(info.id(), span.getSpanContext()); + } + + @Override + public void onOperationEnd(OperationEndInfo info) { + if (!sampled || info.id() == null) return; + + // End the operation span that was started in onOperationStart + var span = operationSpans.remove(info.id()); + if (span == null) return; + + if (info.error() != null) { + span.setStatus(StatusCode.ERROR, info.error().getMessage()); + span.recordException(info.error()); + } + + span.end(); + } + + // ─── User function hooks ───────────────────────────────────────────── + + @Override + public void onUserFunctionStart(UserFunctionStartInfo info) { + if (!sampled) return; + var key = attemptKey(info.id(), info.attempt()); + + // Use the operation span as parent for the attempt span + var parentContext = resolveParentContext(info.id()); + + var spanBuilder = tracer.spanBuilder(attemptSpanName(info.type(), info.subType(), info.name(), info.attempt())) + .setParent(parentContext) + .setStartTimestamp(info.startTimestamp() != null ? info.startTimestamp() : Instant.now()); + + spanBuilder.setAttribute(DURABLE_EXECUTION_ARN, executionArn); + spanBuilder.setAttribute(DURABLE_OPERATION_ID, info.id()); + + if (info.type() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_TYPE, info.type()); + } + if (info.name() != null) { + spanBuilder.setAttribute(DURABLE_OPERATION_NAME, info.name()); + } + if (info.attempt() != null) { + spanBuilder.setAttribute(DURABLE_ATTEMPT_NUMBER, info.attempt().longValue()); + } + + var span = spanBuilder.startSpan(); + attemptSpans.put(key, span); + + // Make span current on this thread so auto-instrumented calls become children + var scope = span.makeCurrent(); + attemptScopes.put(key, scope); + + // Inject trace context into MDC for log-trace correlation + if (enableMdc) { + MdcSpanEnricher.inject(executionArn); + } + } + + @Override + public void onUserFunctionEnd(UserFunctionEndInfo info) { + if (!sampled) return; + var key = attemptKey(info.id(), info.attempt()); + + // Close scope first (must happen on same thread as makeCurrent) + var scope = attemptScopes.remove(key); + if (scope != null) { + scope.close(); + } + + var span = attemptSpans.remove(key); + if (span == null) return; + + // Set outcome + var outcome = info.succeeded() ? "succeeded" : (info.error() != null ? "failed" : "unknown"); + span.setAttribute(DURABLE_ATTEMPT_OUTCOME, outcome); + + if (!info.succeeded() && info.error() != null) { + span.setStatus(StatusCode.ERROR, info.error().getMessage()); + span.recordException(info.error()); + } + + if (info.endTimestamp() != null) { + span.end(info.endTimestamp()); + } else { + span.end(); + } + + // Clear MDC after user function completes + if (enableMdc) { + MdcSpanEnricher.clear(); + } + } + + // ─── Helpers ───────────────────────────────────────────────────────── + + private Context resolveParentContext(String parentId) { + if (parentId != null) { + var parentSpanContext = operationContexts.get(parentId); + if (parentSpanContext != null) { + return Context.current().with(Span.wrap(parentSpanContext)); + } + // Parent operation from a prior invocation — create non-recording placeholder + var deterministicParentSpanId = idGenerator.generateSpanIdForOperation(parentId); + var traceId = idGenerator.generateTraceId(); + var placeholderContext = SpanContext.create( + traceId, deterministicParentSpanId, TraceFlags.getSampled(), TraceState.getDefault()); + return Context.current().with(Span.wrap(placeholderContext)); + } + // Fall back to invocation span as parent + if (invocationSpan != null) { + return Context.current().with(invocationSpan); + } + return Context.current(); + } + + private static String spanName(String type, String subType, String name) { + if (name != null) { + return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase()) + ":" + name; + } + return "durable." + (subType != null ? subType.toLowerCase() : type.toLowerCase()); + } + + private static String attemptSpanName(String type, String subType, String name, Integer attempt) { + var base = spanName(type, subType, name); + if (attempt != null) { + return base + " [attempt " + attempt + "]"; + } + return base + " [fn]"; + } + + private static String attemptKey(String operationId, Integer attempt) { + return operationId + "-" + (attempt != null ? attempt : "ctx"); + } +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SamplingUtil.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SamplingUtil.java new file mode 100644 index 000000000..65d3e8b0b --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SamplingUtil.java @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import java.nio.charset.StandardCharsets; + +/** + * Deterministic sampling utility for durable executions. + * + *

Uses FNV-1a hash of the execution ARN to decide whether to sample. This ensures consistent sampling across all + * invocations of the same execution — if you sample the first invocation, you sample all subsequent ones. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +public final class SamplingUtil { + + private SamplingUtil() {} + + // FNV-1a 32-bit constants + private static final int FNV_OFFSET_BASIS = 0x811c9dc5; + private static final int FNV_PRIME = 0x01000193; + + /** + * Determines whether an execution should be sampled based on its ARN. + * + *

Uses FNV-1a hash to distribute executions uniformly. The same ARN always produces the same result for a given + * sampling rate. + * + * @param executionArn the durable execution ARN + * @param samplingRate value between 0.0 (sample nothing) and 1.0 (sample everything) + * @return true if this execution should be sampled + */ + public static boolean shouldSampleExecution(String executionArn, double samplingRate) { + if (samplingRate >= 1.0) return true; + if (samplingRate <= 0.0) return false; + if (executionArn == null || executionArn.isEmpty()) return false; + + var hash = fnv1a32(executionArn); + // Convert to a value between 0.0 and 1.0 + var normalized = (hash & 0xFFFFFFFFL) / (double) 0x100000000L; + return normalized < samplingRate; + } + + /** FNV-1a 32-bit hash function. */ + static int fnv1a32(String input) { + var bytes = input.getBytes(StandardCharsets.UTF_8); + int hash = FNV_OFFSET_BASIS; + for (byte b : bytes) { + hash ^= (b & 0xFF); + hash *= FNV_PRIME; + } + return hash; + } +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SpanAttributes.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SpanAttributes.java new file mode 100644 index 000000000..3246c39bd --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/SpanAttributes.java @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import io.opentelemetry.api.common.AttributeKey; + +/** + * OTel span attribute keys for durable execution spans. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +final class SpanAttributes { + + private SpanAttributes() {} + + static final AttributeKey DURABLE_EXECUTION_ARN = AttributeKey.stringKey("durable.execution.arn"); + static final AttributeKey DURABLE_OPERATION_ID = AttributeKey.stringKey("durable.operation.id"); + static final AttributeKey DURABLE_OPERATION_TYPE = AttributeKey.stringKey("durable.operation.type"); + static final AttributeKey DURABLE_OPERATION_NAME = AttributeKey.stringKey("durable.operation.name"); + static final AttributeKey DURABLE_OPERATION_SUBTYPE = AttributeKey.stringKey("durable.operation.subtype"); + static final AttributeKey DURABLE_OPERATION_PARENT_ID = + AttributeKey.stringKey("durable.operation.parent_id"); + static final AttributeKey DURABLE_ATTEMPT_NUMBER = AttributeKey.longKey("durable.attempt.number"); + static final AttributeKey DURABLE_ATTEMPT_OUTCOME = AttributeKey.stringKey("durable.attempt.outcome"); + static final AttributeKey DURABLE_INVOCATION_STATUS = AttributeKey.stringKey("durable.invocation.status"); + static final AttributeKey DURABLE_FIRST_INVOCATION = AttributeKey.booleanKey("durable.invocation.first"); +} diff --git a/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/XRayContextExtractor.java b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/XRayContextExtractor.java new file mode 100644 index 000000000..4222fc72c --- /dev/null +++ b/otel-plugin/src/main/java/software/amazon/lambda/durable/otel/XRayContextExtractor.java @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator; +import java.util.Collections; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extracts OTel trace context from the AWS X-Ray {@code _X_AMZN_TRACE_ID} environment variable using the official + * OpenTelemetry AWS X-Ray Propagator. + * + *

Lambda runtime sets this environment variable with the X-Ray trace header for each invocation. This extractor + * reads it and uses the official {@link AwsXrayPropagator} to parse it into an OTel-compatible {@link Context}. + * + * @deprecated This is a preview API that is experimental and may be changed or removed in future releases. + */ +@Deprecated +public class XRayContextExtractor implements ContextExtractor { + + private static final Logger logger = LoggerFactory.getLogger(XRayContextExtractor.class); + private static final String XRAY_ENV_VAR = "_X_AMZN_TRACE_ID"; + private static final String XRAY_HEADER_KEY = "X-Amzn-Trace-Id"; + private static final AwsXrayPropagator PROPAGATOR = AwsXrayPropagator.getInstance(); + + /** A TextMapGetter that reads the X-Ray trace header from the Lambda environment variable. */ + private static final TextMapGetter ENV_GETTER = new TextMapGetter<>() { + @Override + public Iterable keys(String carrier) { + return Collections.singletonList(XRAY_HEADER_KEY); + } + + @Override + public String get(String carrier, String key) { + if (XRAY_HEADER_KEY.equalsIgnoreCase(key)) { + return carrier; + } + return null; + } + }; + + @Override + public Context extract() { + var traceHeader = System.getenv(XRAY_ENV_VAR); + if (traceHeader == null || traceHeader.isEmpty()) { + logger.debug("No X-Ray trace header found in environment"); + return Context.root(); + } + + return PROPAGATOR.extract(Context.root(), traceHeader, ENV_GETTER); + } +} diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/DeterministicIdGeneratorTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/DeterministicIdGeneratorTest.java new file mode 100644 index 000000000..6ca638a06 --- /dev/null +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/DeterministicIdGeneratorTest.java @@ -0,0 +1,139 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class DeterministicIdGeneratorTest { + + private DeterministicIdGenerator generator; + + @BeforeEach + void setUp() { + generator = new DeterministicIdGenerator(); + } + + @Test + void generateTraceId_withoutArn_returnsRandom() { + var id1 = generator.generateTraceId(); + var id2 = generator.generateTraceId(); + + assertNotNull(id1); + assertEquals(32, id1.length()); + // Random IDs should differ (extremely unlikely to collide) + assertNotEquals(id1, id2); + } + + @Test + void generateTraceId_withArn_returnsDeterministic() { + generator.setExecutionArn("arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1"); + + var id1 = generator.generateTraceId(); + var id2 = generator.generateTraceId(); + + assertEquals(32, id1.length()); + assertEquals(id1, id2, "Same ARN should always produce same trace ID"); + } + + @Test + void generateTraceId_differentArns_produceDifferentIds() { + generator.setExecutionArn("arn:exec1"); + var id1 = generator.generateTraceId(); + + generator.setExecutionArn("arn:exec2"); + var id2 = generator.generateTraceId(); + + assertNotEquals(id1, id2); + } + + @Test + void generateSpanId_withoutOperationId_returnsRandom() { + var id1 = generator.generateSpanId(); + var id2 = generator.generateSpanId(); + + assertEquals(16, id1.length()); + assertNotEquals(id1, id2); + } + + @Test + void generateSpanId_withOperationId_returnsDeterministic() { + generator.setExecutionArn("arn:exec1"); + generator.setNextSpanOperationId("op-hash-1"); + var id1 = generator.generateSpanId(); + + generator.setNextSpanOperationId("op-hash-1"); + var id2 = generator.generateSpanId(); + + assertEquals(16, id1.length()); + assertEquals(id1, id2, "Same operation ID should produce same span ID"); + } + + @Test + void generateSpanId_differentOperationIds_produceDifferentIds() { + generator.setExecutionArn("arn:exec1"); + + generator.setNextSpanOperationId("op-1"); + var id1 = generator.generateSpanId(); + + generator.setNextSpanOperationId("op-2"); + var id2 = generator.generateSpanId(); + + assertNotEquals(id1, id2); + } + + @Test + void generateSpanId_consumesPendingId() { + generator.setNextSpanOperationId("op-1"); + var deterministic = generator.generateSpanId(); + + // Second call should be random (pending was consumed) + var random = generator.generateSpanId(); + assertNotEquals(deterministic, random); + } + + @Test + void generateSpanIdForOperation_doesNotConsumePending() { + generator.setExecutionArn("arn:exec1"); + generator.setNextSpanOperationId("op-1"); + + // This should NOT consume the pending + var forOperation = generator.generateSpanIdForOperation("op-2"); + + // The pending should still be consumed by generateSpanId + var fromPending = generator.generateSpanId(); + + assertEquals(16, forOperation.length()); + assertEquals(16, fromPending.length()); + assertNotEquals(forOperation, fromPending); + } + + @Test + void generateSpanIdForOperation_isDeterministic() { + generator.setExecutionArn("arn:exec1"); + + var id1 = generator.generateSpanIdForOperation("op-1"); + var id2 = generator.generateSpanIdForOperation("op-1"); + + assertEquals(id1, id2); + } + + @Test + void traceId_isValidHex() { + generator.setExecutionArn("arn:exec1"); + var traceId = generator.generateTraceId(); + + assertTrue(traceId.matches("[0-9a-f]{32}"), "Trace ID should be 32 hex chars: " + traceId); + } + + @Test + void spanId_isValidHex() { + generator.setExecutionArn("arn:exec1"); + generator.setNextSpanOperationId("op-1"); + var spanId = generator.generateSpanId(); + + assertTrue(spanId.matches("[0-9a-f]{16}"), "Span ID should be 16 hex chars: " + spanId); + } +} diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/MdcSpanEnricherTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/MdcSpanEnricherTest.java new file mode 100644 index 000000000..164104a89 --- /dev/null +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/MdcSpanEnricherTest.java @@ -0,0 +1,85 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.time.Instant; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.slf4j.MDC; +import software.amazon.lambda.durable.plugin.*; + +class MdcSpanEnricherTest { + + @AfterEach + void cleanup() { + MDC.clear(); + } + + @Test + void clear_removesAllMdcKeys() { + MDC.put(MdcSpanEnricher.MDC_TRACE_ID, "abc123"); + MDC.put(MdcSpanEnricher.MDC_SPAN_ID, "def456"); + MDC.put(MdcSpanEnricher.MDC_EXECUTION_ARN, "arn:test"); + + MdcSpanEnricher.clear(); + + assertNull(MDC.get(MdcSpanEnricher.MDC_TRACE_ID)); + assertNull(MDC.get(MdcSpanEnricher.MDC_SPAN_ID)); + assertNull(MDC.get(MdcSpanEnricher.MDC_EXECUTION_ARN)); + } + + @Test + void inject_withNoActiveSpan_doesNotSetTraceIds() { + MdcSpanEnricher.inject("arn:test"); + + // No active span → no trace/span IDs, but ARN is still set + assertNull(MDC.get(MdcSpanEnricher.MDC_TRACE_ID)); + assertNull(MDC.get(MdcSpanEnricher.MDC_SPAN_ID)); + assertEquals("arn:test", MDC.get(MdcSpanEnricher.MDC_EXECUTION_ARN)); + } + + @Test + void inject_withNullArn_doesNotSetArn() { + MdcSpanEnricher.inject(null); + + assertNull(MDC.get(MdcSpanEnricher.MDC_EXECUTION_ARN)); + } + + @Test + void plugin_withMdcEnabled_setsArnInMdc() { + // Test MDC through the full plugin lifecycle (where makeCurrent is called on same thread) + var spanExporter = InMemorySpanExporter.create(); + var idGenerator = new DeterministicIdGenerator(); + var tracerProvider = SdkTracerProvider.builder() + .setIdGenerator(idGenerator) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + var plugin = new OpenTelemetryDurablePlugin( + tracerProvider, idGenerator, () -> io.opentelemetry.context.Context.root(), 1.0, true); + + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec-mdc-test", true)); + + // Simulate onUserFunctionStart on this thread (same as production — hooks fire on user code thread) + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "step", "STEP", "Step", null, Instant.now(), false, 1)); + + // MDC should have execution ARN after onUserFunctionStart + assertEquals("arn:exec-mdc-test", MDC.get(MdcSpanEnricher.MDC_EXECUTION_ARN)); + + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", "step", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + + // MDC should be cleared after onUserFunctionEnd + assertNull(MDC.get(MdcSpanEnricher.MDC_EXECUTION_ARN)); + assertNull(MDC.get(MdcSpanEnricher.MDC_TRACE_ID)); + + plugin.onInvocationEnd( + new InvocationEndInfo("req-1", "arn:exec-mdc-test", true, InvocationStatus.SUCCEEDED, null)); + } +} diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java new file mode 100644 index 000000000..30bd29aa5 --- /dev/null +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/OpenTelemetryDurablePluginTest.java @@ -0,0 +1,242 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.plugin.*; + +class OpenTelemetryDurablePluginTest { + + private InMemorySpanExporter spanExporter; + private OpenTelemetryDurablePlugin plugin; + private DeterministicIdGenerator idGenerator; + + @BeforeEach + void setUp() { + spanExporter = InMemorySpanExporter.create(); + idGenerator = new DeterministicIdGenerator(); + + var tracerProvider = SdkTracerProvider.builder() + .setIdGenerator(idGenerator) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + plugin = new OpenTelemetryDurablePlugin( + tracerProvider, idGenerator, () -> io.opentelemetry.context.Context.root(), 1.0, false); + } + + @Test + void invocationStart_and_end_createsSpan() { + plugin.onInvocationStart(new InvocationInfo( + "req-123", "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1", true)); + plugin.onInvocationEnd(new InvocationEndInfo( + "req-123", + "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1", + true, + InvocationStatus.SUCCEEDED, + null)); + + var spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + + var span = spans.get(0); + assertEquals("durable.invocation", span.getName()); + assertEquals(StatusCode.UNSET, span.getStatus().getStatusCode()); + } + + @Test + void invocationEnd_withFailure_setsErrorStatus() { + plugin.onInvocationStart(new InvocationInfo("req-123", "arn:exec1", true)); + plugin.onInvocationEnd(new InvocationEndInfo( + "req-123", "arn:exec1", true, InvocationStatus.FAILED, new RuntimeException("boom"))); + + var spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + assertEquals(StatusCode.ERROR, spans.get(0).getStatus().getStatusCode()); + } + + @Test + void operationStart_createsSpan_operationEnd_endsIt() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + var start = Instant.parse("2026-06-01T10:00:00Z"); + var end = Instant.parse("2026-06-01T10:00:05Z"); + + // Operation span created at start + plugin.onOperationStart(new OperationInfo("op-hash-1", "my-step", "STEP", "Step", null, start, null)); + + // Operation span ended at completion + plugin.onOperationEnd(new OperationEndInfo("op-hash-1", "my-step", "STEP", "Step", null, start, end, null)); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + var spans = spanExporter.getFinishedSpanItems(); + assertEquals(2, spans.size()); + + var operationSpan = spans.stream() + .filter(s -> s.getName().contains("step")) + .findFirst() + .orElseThrow(); + assertEquals("durable.step:my-step", operationSpan.getName()); + } + + @Test + void userFunctionStart_and_end_createsAttemptSpan() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "compute", "STEP", "Step", null, Instant.now(), false, 1)); + + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", "compute", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + var spans = spanExporter.getFinishedSpanItems(); + assertEquals(2, spans.size()); + + var attemptSpan = spans.stream() + .filter(s -> s.getName().contains("attempt")) + .findFirst() + .orElseThrow(); + assertTrue(attemptSpan.getName().contains("compute")); + assertTrue(attemptSpan.getName().contains("attempt 1")); + } + + @Test + void userFunctionEnd_withFailure_setsErrorOnAttemptSpan() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "failing", "STEP", "Step", null, Instant.now(), false, 1)); + + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", + "failing", + "STEP", + "Step", + null, + Instant.now(), + Instant.now(), + false, + 1, + false, + new RuntimeException("step failed"))); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.FAILED, null)); + + var attemptSpan = spanExporter.getFinishedSpanItems().stream() + .filter(s -> s.getName().contains("attempt")) + .findFirst() + .orElseThrow(); + assertEquals(StatusCode.ERROR, attemptSpan.getStatus().getStatusCode()); + } + + @Test + void fullLifecycle_producesCorrectSpanHierarchy() { + var arn = "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1"; + plugin.onInvocationStart(new InvocationInfo("req-1", arn, true)); + + // Step 1: operation starts, user function runs, operation completes + plugin.onOperationStart(new OperationInfo("op-1", "step-a", "STEP", "Step", null, Instant.now(), null)); + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "step-a", "STEP", "Step", null, Instant.now(), false, 1)); + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", "step-a", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + plugin.onOperationEnd( + new OperationEndInfo("op-1", "step-a", "STEP", "Step", null, Instant.now(), Instant.now(), null)); + + // Step 2: operation starts, user function runs, operation completes + plugin.onOperationStart(new OperationInfo("op-2", "step-b", "STEP", "Step", null, Instant.now(), null)); + plugin.onUserFunctionStart( + new UserFunctionStartInfo("op-2", "step-b", "STEP", "Step", null, Instant.now(), false, 1)); + plugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-2", "step-b", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + plugin.onOperationEnd( + new OperationEndInfo("op-2", "step-b", "STEP", "Step", null, Instant.now(), Instant.now(), null)); + + plugin.onInvocationEnd(new InvocationEndInfo("req-1", arn, true, InvocationStatus.SUCCEEDED, null)); + + var spans = spanExporter.getFinishedSpanItems(); + // 2 attempt spans + 2 operation spans + 1 invocation span = 5 + assertEquals(5, spans.size()); + + // All spans should share the same trace ID + var traceId = spans.get(0).getTraceId(); + assertTrue(spans.stream().allMatch(s -> s.getTraceId().equals(traceId))); + } + + @Test + void deterministicIds_sameExecutionProducesSameTraceId() { + var arn = "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1"; + + plugin.onInvocationStart(new InvocationInfo("req-1", arn, true)); + plugin.onInvocationEnd(new InvocationEndInfo("req-1", arn, true, InvocationStatus.PENDING, null)); + + var firstTraceId = spanExporter.getFinishedSpanItems().get(0).getTraceId(); + spanExporter.reset(); + + // Second invocation of same execution + plugin.onInvocationStart(new InvocationInfo("req-2", arn, false)); + plugin.onInvocationEnd(new InvocationEndInfo("req-2", arn, false, InvocationStatus.SUCCEEDED, null)); + + var secondTraceId = spanExporter.getFinishedSpanItems().get(0).getTraceId(); + + assertEquals(firstTraceId, secondTraceId, "Same execution ARN should produce same trace ID"); + } + + @Test + void operationNotCompleted_spanEndedAtInvocationEnd() { + plugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + + // Operation starts but never completes (e.g., wait operation, invocation suspends) + plugin.onOperationStart(new OperationInfo("op-1", "my-wait", "WAIT", "Wait", null, Instant.now(), null)); + + // Invocation ends without onOperationEnd being called + plugin.onInvocationEnd(new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.PENDING, null)); + + var spans = spanExporter.getFinishedSpanItems(); + // Should have: operation span (ended at invocation end) + invocation span + assertEquals(2, spans.size()); + + var operationSpan = spans.stream() + .filter(s -> s.getName().contains("wait")) + .findFirst() + .orElseThrow(); + assertEquals("durable.wait:my-wait", operationSpan.getName()); + } + + @Test + void sampling_disabledExecution_producesNoSpans() { + spanExporter = InMemorySpanExporter.create(); + var sampledPlugin = new OpenTelemetryDurablePlugin( + SdkTracerProvider.builder() + .setIdGenerator(idGenerator) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(), + idGenerator, + () -> io.opentelemetry.context.Context.root(), + 0.0, // 0% sampling — nothing should be traced + false); + + sampledPlugin.onInvocationStart(new InvocationInfo("req-1", "arn:exec1", true)); + sampledPlugin.onUserFunctionStart( + new UserFunctionStartInfo("op-1", "step", "STEP", "Step", null, Instant.now(), false, 1)); + sampledPlugin.onUserFunctionEnd(new UserFunctionEndInfo( + "op-1", "step", "STEP", "Step", null, Instant.now(), Instant.now(), false, 1, true, null)); + sampledPlugin.onOperationEnd( + new OperationEndInfo("op-1", "step", "STEP", "Step", null, Instant.now(), Instant.now(), null)); + sampledPlugin.onInvocationEnd( + new InvocationEndInfo("req-1", "arn:exec1", true, InvocationStatus.SUCCEEDED, null)); + + assertTrue(spanExporter.getFinishedSpanItems().isEmpty(), "No spans should be exported with 0% sampling"); + } +} diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/SamplingUtilTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/SamplingUtilTest.java new file mode 100644 index 000000000..ed7817321 --- /dev/null +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/SamplingUtilTest.java @@ -0,0 +1,74 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.Test; + +class SamplingUtilTest { + + @Test + void samplingRate1_alwaysSamples() { + assertTrue(SamplingUtil.shouldSampleExecution("arn:exec1", 1.0)); + assertTrue(SamplingUtil.shouldSampleExecution("arn:exec2", 1.0)); + assertTrue(SamplingUtil.shouldSampleExecution("anything", 1.0)); + } + + @Test + void samplingRate0_neverSamples() { + assertFalse(SamplingUtil.shouldSampleExecution("arn:exec1", 0.0)); + assertFalse(SamplingUtil.shouldSampleExecution("arn:exec2", 0.0)); + } + + @Test + void deterministic_sameArnSameResult() { + var arn = "arn:aws:lambda:us-east-1:123:function:test:$LATEST/durable/exec1"; + var result1 = SamplingUtil.shouldSampleExecution(arn, 0.5); + var result2 = SamplingUtil.shouldSampleExecution(arn, 0.5); + + assertEquals(result1, result2, "Same ARN should always produce same sampling decision"); + } + + @Test + void distribution_approximatesRate() { + // With enough samples, the fraction sampled should approximate the rate + int sampled = 0; + int total = 10000; + for (int i = 0; i < total; i++) { + if (SamplingUtil.shouldSampleExecution("arn:exec-" + i, 0.5)) { + sampled++; + } + } + + double actualRate = (double) sampled / total; + // Allow 5% tolerance + assertTrue(actualRate > 0.45 && actualRate < 0.55, "Sampling rate should be ~50%, got " + actualRate); + } + + @Test + void nullArn_returnsFalse() { + assertFalse(SamplingUtil.shouldSampleExecution(null, 0.5)); + } + + @Test + void emptyArn_returnsFalse() { + assertFalse(SamplingUtil.shouldSampleExecution("", 0.5)); + } + + @Test + void fnv1a32_isDeterministic() { + var hash1 = SamplingUtil.fnv1a32("test-input"); + var hash2 = SamplingUtil.fnv1a32("test-input"); + + assertEquals(hash1, hash2); + } + + @Test + void fnv1a32_differentInputsDifferentHashes() { + var hash1 = SamplingUtil.fnv1a32("input-a"); + var hash2 = SamplingUtil.fnv1a32("input-b"); + + assertNotEquals(hash1, hash2); + } +} diff --git a/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/XRayContextExtractorTest.java b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/XRayContextExtractorTest.java new file mode 100644 index 000000000..e869315dc --- /dev/null +++ b/otel-plugin/src/test/java/software/amazon/lambda/durable/otel/XRayContextExtractorTest.java @@ -0,0 +1,28 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable.otel; + +import static org.junit.jupiter.api.Assertions.*; + +import io.opentelemetry.context.Context; +import org.junit.jupiter.api.Test; + +class XRayContextExtractorTest { + + @Test + void extract_withoutEnvVar_returnsRoot() { + // _X_AMZN_TRACE_ID is not set in test environment + var extractor = new XRayContextExtractor(); + var context = extractor.extract(); + + // Should return root context when env var is missing + assertEquals(Context.root(), context); + } + + @Test + void extract_implementsContextExtractor() { + // Verify XRayContextExtractor implements the ContextExtractor interface + ContextExtractor extractor = new XRayContextExtractor(); + assertNotNull(extractor.extract()); + } +} diff --git a/pom.xml b/pom.xml index 9beac2201..6097b23e0 100644 --- a/pom.xml +++ b/pom.xml @@ -42,6 +42,7 @@ sdk sdk-testing sdk-integration-tests + otel-plugin examples coverage-report diff --git a/sdk-integration-tests/pom.xml b/sdk-integration-tests/pom.xml index 7794453df..d389defe0 100644 --- a/sdk-integration-tests/pom.xml +++ b/sdk-integration-tests/pom.xml @@ -36,6 +36,18 @@ ${project.version} test + + software.amazon.lambda.durable + aws-durable-execution-sdk-java-otel + ${project.version} + test + + + io.opentelemetry + opentelemetry-sdk-testing + 1.62.0 + test + org.junit.jupiter junit-jupiter diff --git a/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java new file mode 100644 index 000000000..66b0c74f1 --- /dev/null +++ b/sdk-integration-tests/src/test/java/software/amazon/lambda/durable/OtelPluginIntegrationTest.java @@ -0,0 +1,251 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 +package software.amazon.lambda.durable; + +import static org.junit.jupiter.api.Assertions.*; + +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.lambda.durable.config.StepConfig; +import software.amazon.lambda.durable.model.ExecutionStatus; +import software.amazon.lambda.durable.otel.DeterministicIdGenerator; +import software.amazon.lambda.durable.otel.OpenTelemetryDurablePlugin; +import software.amazon.lambda.durable.retry.RetryStrategies; +import software.amazon.lambda.durable.testing.LocalDurableTestRunner; + +/** + * Integration tests verifying the OTel plugin produces correct spans when running with the real SDK execution engine + * (LocalDurableTestRunner). + */ +class OtelPluginIntegrationTest { + + private InMemorySpanExporter spanExporter; + private DeterministicIdGenerator idGenerator; + private DurableConfig otelConfig; + + @BeforeEach + void setUp() { + spanExporter = InMemorySpanExporter.create(); + idGenerator = new DeterministicIdGenerator(); + + var tracerProvider = SdkTracerProvider.builder() + .setIdGenerator(idGenerator) + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + var plugin = new OpenTelemetryDurablePlugin( + tracerProvider, idGenerator, () -> io.opentelemetry.context.Context.root(), 1.0, false); + + otelConfig = DurableConfig.builder().withPlugins(plugin).build(); + } + + @Test + void simpleStep_producesInvocationAndOperationAndAttemptSpans() { + var runner = LocalDurableTestRunner.create( + String.class, (input, ctx) -> ctx.step("greet", String.class, stepCtx -> "Hello " + input), otelConfig); + + var result = runner.runUntilComplete("World"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // Should have: invocation + operation (backfilled) + attempt = 3 + assertTrue(spans.size() >= 3, "Expected at least 3 spans, got " + spans.size()); + + // Verify span names + assertSpanExists(spans, "durable.invocation"); + assertSpanExists(spans, "durable.step:greet"); + assertSpanExists(spans, "durable.step:greet [attempt 1]"); + + // All spans share the same trace ID + var traceId = spans.get(0).getTraceId(); + assertTrue(spans.stream().allMatch(s -> s.getTraceId().equals(traceId))); + } + + @Test + void multipleSteps_producesSpansForEach() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + ctx.step("step-a", String.class, stepCtx -> "A"); + ctx.step("step-b", String.class, stepCtx -> "B"); + ctx.step("step-c", String.class, stepCtx -> "C"); + return "done"; + }, + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // 1 invocation + 3 operations + 3 attempts = 7 + assertTrue(spans.size() >= 7, "Expected at least 7 spans, got " + spans.size()); + + assertSpanExists(spans, "durable.step:step-a"); + assertSpanExists(spans, "durable.step:step-b"); + assertSpanExists(spans, "durable.step:step-c"); + assertSpanExists(spans, "durable.step:step-a [attempt 1]"); + assertSpanExists(spans, "durable.step:step-b [attempt 1]"); + assertSpanExists(spans, "durable.step:step-c [attempt 1]"); + } + + @Test + void retryStep_producesMultipleAttemptSpans() { + var attempts = new AtomicInteger(0); + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.step( + "flaky", + String.class, + stepCtx -> { + if (attempts.incrementAndGet() < 3) { + throw new RuntimeException("not yet"); + } + return "success"; + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.DEFAULT) + .build()), + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // Should have multiple attempt spans for the retry step + var attemptSpans = spans.stream() + .filter(s -> s.getName().contains("flaky") && s.getName().contains("attempt")) + .toList(); + assertTrue(attemptSpans.size() >= 3, "Should have at least 3 attempt spans, got " + attemptSpans.size()); + } + + @Test + void waitAndResume_producesSpansAcrossInvocations() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> { + ctx.step("before-wait", String.class, stepCtx -> "pre"); + ctx.wait("pause", Duration.ofMinutes(1)); + ctx.step("after-wait", String.class, stepCtx -> "post"); + return "done"; + }, + otelConfig); + + // First invocation: step + wait → suspend + var result1 = runner.run("input"); + assertEquals(ExecutionStatus.PENDING, result1.getStatus()); + + var spansAfterFirstInvocation = spanExporter.getFinishedSpanItems().size(); + assertTrue(spansAfterFirstInvocation > 0, "Should have spans from first invocation"); + + // Advance time and resume + runner.advanceTime(); + var result2 = runner.run("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus()); + + var allSpans = spanExporter.getFinishedSpanItems(); + + // Should have spans from both invocations + assertTrue(allSpans.size() > spansAfterFirstInvocation, "Second invocation should produce additional spans"); + + // Verify both invocations produced invocation spans + var invocationSpans = allSpans.stream() + .filter(s -> s.getName().equals("durable.invocation")) + .toList(); + assertEquals(2, invocationSpans.size(), "Should have 2 invocation spans (one per run)"); + } + + @Test + void childContext_producesNestedSpans() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.runInChildContext("child", String.class, childCtx -> { + return childCtx.step("inner", String.class, stepCtx -> "from-child"); + }), + otelConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // Should have: invocation + child context operation + child context attempt (fn) + // + inner step operation + inner step attempt = 5 + assertTrue(spans.size() >= 5, "Should have at least 5 spans for child context, got " + spans.size()); + + assertSpanExists(spans, "durable.runinchildcontext:child"); + assertSpanExists(spans, "durable.step:inner"); + } + + @Test + void failedStep_producesErrorSpan() { + var runner = LocalDurableTestRunner.create( + String.class, + (input, ctx) -> ctx.step( + "fail", + String.class, + stepCtx -> { + throw new RuntimeException("boom"); + }, + StepConfig.builder() + .retryStrategy(RetryStrategies.Presets.NO_RETRY) + .build()), + otelConfig); + + var result = runner.run("input"); + assertEquals(ExecutionStatus.FAILED, result.getStatus()); + + var spans = spanExporter.getFinishedSpanItems(); + + // Invocation span should have error status + var invocationSpan = spans.stream() + .filter(s -> s.getName().equals("durable.invocation")) + .findFirst() + .orElseThrow(); + assertEquals( + io.opentelemetry.api.trace.StatusCode.ERROR, + invocationSpan.getStatus().getStatusCode(), + "Invocation span should have ERROR status"); + } + + @Test + void sampling_zeroRate_producesNoSpans() { + var sampledExporter = InMemorySpanExporter.create(); + var sampledIdGen = new DeterministicIdGenerator(); + var sampledProvider = SdkTracerProvider.builder() + .setIdGenerator(sampledIdGen) + .addSpanProcessor(SimpleSpanProcessor.create(sampledExporter)) + .build(); + + var noSamplePlugin = new OpenTelemetryDurablePlugin( + sampledProvider, sampledIdGen, () -> io.opentelemetry.context.Context.root(), 0.0, false); + + var noSampleConfig = DurableConfig.builder().withPlugins(noSamplePlugin).build(); + + var runner = LocalDurableTestRunner.create( + String.class, (input, ctx) -> ctx.step("step", String.class, stepCtx -> "result"), noSampleConfig); + + var result = runner.runUntilComplete("input"); + assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus()); + + assertTrue(sampledExporter.getFinishedSpanItems().isEmpty(), "0% sampling should produce no spans"); + } + + // ─── Helpers ───────────────────────────────────────────────────────── + + private static void assertSpanExists(List spans, String expectedName) { + assertTrue( + spans.stream().anyMatch(s -> s.getName().equals(expectedName)), + "Expected span '" + expectedName + "' not found. Got: " + + spans.stream().map(SpanData::getName).toList()); + } +}