Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark212Listener",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public String[] helperClassNames() {
return new String[] {
packageName + ".AbstractDatadogSparkListener",
packageName + ".DatabricksParentContext",
packageName + ".OpenlineageParentContext",
packageName + ".DatadogSpark213Listener",
packageName + ".RemoveEldestHashMap",
packageName + ".SparkAggregatedTaskMetrics",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.spark.ExceptionFailure;
Expand Down Expand Up @@ -187,12 +188,37 @@ private void initApplicationSpanIfNotInitialized() {
}

captureApplicationParameters(builder);
captureOpenlineageContextIfPresent(builder);

applicationSpan = builder.start();
setDataJobsSamplingPriority(applicationSpan);
applicationSpan.setMeasured(true);
}

private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) {
Optional<OpenlineageParentContext> openlineageParentContext =
OpenlineageParentContext.from(sparkConf);

if (openlineageParentContext.isPresent()) {
OpenlineageParentContext context = openlineageParentContext.get();
builder.asChildOf(context);

builder.withSpanId(context.getChildRootSpanId());

log.debug(
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
context,
context.getTraceId(),
context.getChildRootSpanId());

builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
} else {
log.debug("Openlineage context not found");
}
}

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.api.DDSpanId;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpenlineageParentContext implements AgentSpan.Context {
private static final Logger log = LoggerFactory.getLogger(OpenlineageParentContext.class);
private static final Pattern UUID =
Pattern.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");

private final DDTraceId traceId;
private final long spanId;
private final long childRootSpanId;

private final String parentJobNamespace;
private final String parentJobName;
private final String parentRunId;

public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
"spark.openlineage.parentJobNamespace";
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";

public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
|| !sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAME)
|| !sparkConf.contains(OPENLINEAGE_PARENT_RUN_ID)) {
return Optional.empty();
}

String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);

if (!UUID.matcher(parentRunId).matches()) {
return Optional.empty();
}

return Optional.of(
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
}

OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
log.debug(
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
parentJobNamespace,
parentJobName,
parentRunId);

this.parentJobNamespace = parentJobNamespace;
this.parentJobName = parentJobName;
this.parentRunId = parentRunId;

MessageDigest digest = null;
try {
digest = MessageDigest.getInstance("SHA-256");
} catch (NoSuchAlgorithmException e) {
log.debug("Unable to find SHA-256 algorithm", e);
}

if (digest != null && parentJobNamespace != null && parentRunId != null) {
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
spanId = DDSpanId.ZERO;

childRootSpanId =
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
} else {
traceId = DDTraceId.ZERO;
spanId = DDSpanId.ZERO;

childRootSpanId = DDSpanId.ZERO;
}

log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
}

private long computeChildRootSpanId(
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
byte[] inputBytes =
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
byte[] hash = digest.digest(inputBytes);

return ByteBuffer.wrap(hash).getLong();
}

private DDTraceId computeTraceId(
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
byte[] inputBytes =
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
byte[] hash = digest.digest(inputBytes);

return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
}

@Override
public DDTraceId getTraceId() {
return traceId;
}

@Override
public long getSpanId() {
return spanId;
}

public long getChildRootSpanId() {
return childRootSpanId;
}

@Override
public AgentTraceCollector getTraceCollector() {
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
}

@Override
public int getSamplingPriority() {
return PrioritySampling.USER_KEEP;
}

@Override
public Iterable<Map.Entry<String, String>> baggageItems() {
return Collections.<String, String>emptyMap().entrySet();
}

@Override
public PathwayContext getPathwayContext() {
return null;
}

public String getParentJobNamespace() {
return parentJobNamespace;
}

public String getParentJobName() {
return parentJobName;
}

public String getParentRunId() {
return parentRunId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package datadog.trace.instrumentation.spark

import datadog.trace.api.DDSpanId
import org.apache.spark.SparkConf
import spock.lang.Specification

class OpenlineageParentContextTest extends Specification {
def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () {
given:
SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent()

parentContext.get().getParentJobNamespace() == "default"
parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3"
parentContext.get().getParentRunId() == expectedParentRunId

parentContext.get().traceId.toLong() == expectedTraceId
parentContext.get().spanId == DDSpanId.ZERO
parentContext.get().childRootSpanId == expectedRootSpanId

where:
parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId
"ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL
"ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL
}

def "should create empty OpenLineageParentContext if any required field is missing" () {
given:
SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent() == expected

where:
jobNamespacePresent | jobNamePresent | runIdPresent | expected
true | true | false | false
true | false | true | false
false | true | true | false
true | false | false | false
false | true | false | false
false | false | true | false
false | false | false | false
}

def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () {
given:
SparkConf mockSparkConf = Mock(SparkConf)

when:
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId

then:
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
parentContext.isPresent() == expected

where:
runId | expected
"6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true
" " | false
"invalid-uuid" | false
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
}
}