diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 734e779d571..15e6fa5a80f 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -15,6 +15,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".DatabricksParentContext", + packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark212Listener", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index e54bb1b78b0..0d80eb7553c 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -15,6 +15,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".DatabricksParentContext", + packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark213Listener", packageName + ".RemoveEldestHashMap", packageName + ".SparkAggregatedTaskMetrics", diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 662c4697615..d009f3c826c 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.UUID; import org.apache.spark.ExceptionFailure; @@ -187,12 +188,37 @@ private void initApplicationSpanIfNotInitialized() { } captureApplicationParameters(builder); + captureOpenlineageContextIfPresent(builder); applicationSpan = builder.start(); setDataJobsSamplingPriority(applicationSpan); applicationSpan.setMeasured(true); } + private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) { + Optional openlineageParentContext = + OpenlineageParentContext.from(sparkConf); + + if (openlineageParentContext.isPresent()) { + OpenlineageParentContext context = openlineageParentContext.get(); + builder.asChildOf(context); + + builder.withSpanId(context.getChildRootSpanId()); + + log.debug( + "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}", + context, + context.getTraceId(), + context.getChildRootSpanId()); + + builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace()); + builder.withTag("openlineage_parent_job_name", context.getParentJobName()); + builder.withTag("openlineage_parent_run_id", context.getParentRunId()); + } else { + log.debug("Openlineage context not found"); + } + } + @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { log.info( diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java new file mode 100644 index 00000000000..d4ac5775425 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/OpenlineageParentContext.java @@ -0,0 +1,157 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.api.DDSpanId; +import datadog.trace.api.DDTraceId; +import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.PathwayContext; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import org.apache.spark.SparkConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OpenlineageParentContext implements AgentSpan.Context { + private static final Logger log = LoggerFactory.getLogger(OpenlineageParentContext.class); + private static final Pattern UUID = + Pattern.compile( + "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"); + + private final DDTraceId traceId; + private final long spanId; + private final long childRootSpanId; + + private final String parentJobNamespace; + private final String parentJobName; + private final String parentRunId; + + public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE = + "spark.openlineage.parentJobNamespace"; + public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName"; + public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId"; + + public static Optional from(SparkConf sparkConf) { + if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE) + || !sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAME) + || !sparkConf.contains(OPENLINEAGE_PARENT_RUN_ID)) { + return Optional.empty(); + } + + String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE); + String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME); + String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID); + + if (!UUID.matcher(parentRunId).matches()) { + return Optional.empty(); + } + + return Optional.of( + new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId)); + } + + OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) { + log.debug( + "Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}", + parentJobNamespace, + parentJobName, + parentRunId); + + this.parentJobNamespace = parentJobNamespace; + this.parentJobName = parentJobName; + this.parentRunId = parentRunId; + + MessageDigest digest = null; + try { + digest = MessageDigest.getInstance("SHA-256"); + } catch (NoSuchAlgorithmException e) { + log.debug("Unable to find SHA-256 algorithm", e); + } + + if (digest != null && parentJobNamespace != null && parentRunId != null) { + traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId); + spanId = DDSpanId.ZERO; + + childRootSpanId = + computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId); + } else { + traceId = DDTraceId.ZERO; + spanId = DDSpanId.ZERO; + + childRootSpanId = DDSpanId.ZERO; + } + + log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId); + } + + private long computeChildRootSpanId( + MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { + byte[] inputBytes = + (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); + byte[] hash = digest.digest(inputBytes); + + return ByteBuffer.wrap(hash).getLong(); + } + + private DDTraceId computeTraceId( + MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) { + byte[] inputBytes = + (parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8); + byte[] hash = digest.digest(inputBytes); + + return DDTraceId.from(ByteBuffer.wrap(hash).getLong()); + } + + @Override + public DDTraceId getTraceId() { + return traceId; + } + + @Override + public long getSpanId() { + return spanId; + } + + public long getChildRootSpanId() { + return childRootSpanId; + } + + @Override + public AgentTraceCollector getTraceCollector() { + return AgentTracer.NoopAgentTraceCollector.INSTANCE; + } + + @Override + public int getSamplingPriority() { + return PrioritySampling.USER_KEEP; + } + + @Override + public Iterable> baggageItems() { + return Collections.emptyMap().entrySet(); + } + + @Override + public PathwayContext getPathwayContext() { + return null; + } + + public String getParentJobNamespace() { + return parentJobNamespace; + } + + public String getParentJobName() { + return parentJobName; + } + + public String getParentRunId() { + return parentRunId; + } +} diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy new file mode 100644 index 00000000000..34ec29b42b4 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/OpenlineageParentContextTest.groovy @@ -0,0 +1,87 @@ +package datadog.trace.instrumentation.spark + +import datadog.trace.api.DDSpanId +import org.apache.spark.SparkConf +import spock.lang.Specification + +class OpenlineageParentContextTest extends Specification { + def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () { + given: + SparkConf mockSparkConf = Mock(SparkConf) + + when: + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId + + then: + Optional parentContext = OpenlineageParentContext.from(mockSparkConf) + parentContext.isPresent() + + parentContext.get().getParentJobNamespace() == "default" + parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3" + parentContext.get().getParentRunId() == expectedParentRunId + + parentContext.get().traceId.toLong() == expectedTraceId + parentContext.get().spanId == DDSpanId.ZERO + parentContext.get().childRootSpanId == expectedRootSpanId + + where: + parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId + "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL + "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL + } + + def "should create empty OpenLineageParentContext if any required field is missing" () { + given: + SparkConf mockSparkConf = Mock(SparkConf) + + when: + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent + + then: + Optional parentContext = OpenlineageParentContext.from(mockSparkConf) + parentContext.isPresent() == expected + + where: + jobNamespacePresent | jobNamePresent | runIdPresent | expected + true | true | false | false + true | false | true | false + false | true | true | false + true | false | false | false + false | true | false | false + false | false | true | false + false | false | false | false + } + + def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () { + given: + SparkConf mockSparkConf = Mock(SparkConf) + + when: + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true + mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3" + mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId + + then: + Optional parentContext = OpenlineageParentContext.from(mockSparkConf) + parentContext.isPresent() == expected + + where: + runId | expected + "6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true + " " | false + "invalid-uuid" | false + "6afeb6ee-729d-37f7-b8e6f47ca694" | false + "6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true + } +} +