Skip to content

Commit

Permalink
feat(patch): support fine grained lineage patches (datahub-project#9408)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
2 people authored and Salman-Apptware committed Dec 15, 2023
1 parent 7724d2f commit 4ae175f
Show file tree
Hide file tree
Showing 8 changed files with 1,023 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package com.linkedin.metadata.models.registry.template.dataset;

import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*;
import static com.linkedin.metadata.Constants.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Streams;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.FineGrainedLineageArray;
import com.linkedin.dataset.UpstreamArray;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.metadata.models.registry.template.ArrayMergingTemplate;
import com.linkedin.metadata.models.registry.template.CompoundKeyTemplate;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class UpstreamLineageTemplate implements ArrayMergingTemplate<UpstreamLineage> {
public class UpstreamLineageTemplate extends CompoundKeyTemplate<UpstreamLineage> {

// Fields
private static final String UPSTREAMS_FIELD_NAME = "upstreams";
private static final String DATASET_FIELD_NAME = "dataset";
private static final String FINE_GRAINED_LINEAGES_FIELD_NAME = "fineGrainedLineages";
private static final String FINE_GRAINED_UPSTREAM_TYPE = "upstreamType";
private static final String FINE_GRAINED_UPSTREAMS = "upstreams";
private static final String FINE_GRAINED_DOWNSTREAM_TYPE = "downstreamType";
private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams";
private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation";
private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore";

// TODO: Fine Grained Lineages not patchable at this time, they don't have a well established key
// Template support
private static final String NONE_TRANSFORMATION_TYPE = "NONE";
private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f;

@Override
public UpstreamLineage getSubtype(RecordTemplate recordTemplate) throws ClassCastException {
Expand Down Expand Up @@ -42,14 +63,250 @@ public UpstreamLineage getDefault() {
@Nonnull
@Override
public JsonNode transformFields(JsonNode baseNode) {
return arrayFieldToMap(
baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
JsonNode transformedNode =
arrayFieldToMap(
baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
((ObjectNode) transformedNode)
.set(
FINE_GRAINED_LINEAGES_FIELD_NAME,
combineAndTransformFineGrainedLineages(
transformedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME)));

return transformedNode;
}

@Nonnull
@Override
public JsonNode rebaseFields(JsonNode patched) {
return transformedMapToArray(
patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
JsonNode rebasedNode =
transformedMapToArray(
patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME));
((ObjectNode) rebasedNode)
.set(
FINE_GRAINED_LINEAGES_FIELD_NAME,
reconstructFineGrainedLineages(rebasedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME)));
return rebasedNode;
}

/**
* Combines fine grained lineage array into a map using upstream and downstream types as keys,
* defaulting when not present. Due to this construction, patches will look like: path:
* /fineGrainedLineages/TRANSFORMATION_OPERATION/(upstreamType || downstreamType)/TYPE/FIELD_URN,
* op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed
* it doesn't necessarily have a consistent key we can reference, so this specialized method
* mimics the arrayFieldToMap of the super class with the specialization that it does not put the
* full value of the aspect at the end of the key, just the particular array. This prevents
* unintended overwrites through improper MCP construction that is technically allowed by the
* schema when combining under fields that form the natural key.
*
* @param fineGrainedLineages the fine grained lineage array node
* @return the modified {@link JsonNode} with array fields transformed to maps
*/
private JsonNode combineAndTransformFineGrainedLineages(@Nullable JsonNode fineGrainedLineages) {
ObjectNode mapNode = instance.objectNode();
if (!(fineGrainedLineages instanceof ArrayNode) || fineGrainedLineages.isEmpty()) {
return mapNode;
}
JsonNode lineageCopy = fineGrainedLineages.deepCopy();

lineageCopy
.elements()
.forEachRemaining(
node -> {
JsonNode nodeClone = node.deepCopy();
String transformationOperation =
nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION)
? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText()
: NONE_TRANSFORMATION_TYPE;

if (!mapNode.has(transformationOperation)) {
mapNode.set(transformationOperation, instance.objectNode());
}
ObjectNode transformationOperationNode =
(ObjectNode) mapNode.get(transformationOperation);

Float confidenceScore =
nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE)
? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue()
: DEFAULT_CONFIDENCE_SCORE;

String upstreamType =
nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE)
? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText()
: null;
String downstreamType =
nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE)
? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText()
: null;
ArrayNode upstreams =
nodeClone.has(FINE_GRAINED_UPSTREAMS)
? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS)
: null;
ArrayNode downstreams =
nodeClone.has(FINE_GRAINED_DOWNSTREAMS)
? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS)
: null;

// Handle upstreams
if (upstreamType == null) {
// Determine default type
Urn upstreamUrn =
upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null;
if (upstreamUrn != null
&& SCHEMA_FIELD_ENTITY_NAME.equals(upstreamUrn.getEntityType())) {
upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
} else {
upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE;
}
}
if (!transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)) {
transformationOperationNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.objectNode());
}
ObjectNode upstreamTypeNode =
(ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE);
if (!upstreamTypeNode.has(upstreamType)) {
upstreamTypeNode.set(upstreamType, instance.objectNode());
}
if (upstreams != null) {
addUrnsToSubType(upstreamTypeNode, upstreams, upstreamType, confidenceScore);
}

// Handle downstreams
if (downstreamType == null) {
// Determine default type
if (downstreams != null && downstreams.size() > 1) {
downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE;
} else {
downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE;
}
}
if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) {
transformationOperationNode.set(
FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode());
}
ObjectNode downstreamTypeNode =
(ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE);
if (!downstreamTypeNode.has(downstreamType)) {
downstreamTypeNode.set(downstreamType, instance.objectNode());
}
if (downstreams != null) {
addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore);
}
});
return mapNode;
}

private void addUrnsToSubType(
JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) {
ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType);
// Will overwrite repeat urns with different confidence scores with the most recently seen
upstreamSubTypeNode.setAll(
Streams.stream(urnsList.elements())
.map(JsonNode::asText)
.distinct()
.collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore))));
}

/**
* Takes the transformed fine grained lineages map from pre-processing and reconstructs an array
* of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and
* not the original
*
* @param transformedFineGrainedLineages the transformed fine grained lineage map
* @return the modified {@link JsonNode} formatted consistent with the original schema
*/
private ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) {
if (transformedFineGrainedLineages instanceof ArrayNode) {
// We already have an ArrayNode, no need to transform. This happens during `replace`
// operations
return (ArrayNode) transformedFineGrainedLineages;
}
ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages;
ArrayNode arrayNode = instance.arrayNode();

mapNode
.fieldNames()
.forEachRemaining(
transformationOperation -> {
final ObjectNode transformationOperationNode =
(ObjectNode) mapNode.get(transformationOperation);
final ObjectNode upstreamType =
transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)
? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE)
: instance.objectNode();
final ObjectNode downstreamType =
transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)
? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE)
: instance.objectNode();

// Handle upstreams
if (!upstreamType.isEmpty()) {
populateTypeNode(
upstreamType,
transformationOperation,
FINE_GRAINED_UPSTREAM_TYPE,
FINE_GRAINED_UPSTREAMS,
FINE_GRAINED_DOWNSTREAM_TYPE,
arrayNode);
}

// Handle downstreams
if (!downstreamType.isEmpty()) {
populateTypeNode(
downstreamType,
transformationOperation,
FINE_GRAINED_DOWNSTREAM_TYPE,
FINE_GRAINED_DOWNSTREAMS,
FINE_GRAINED_UPSTREAM_TYPE,
arrayNode);
}
});

return arrayNode;
}

private void populateTypeNode(
JsonNode typeNode,
String transformationOperation,
String typeName,
String arrayTypeName,
String defaultTypeName,
ArrayNode arrayNode) {
typeNode
.fieldNames()
.forEachRemaining(
subTypeName -> {
ObjectNode subType = (ObjectNode) typeNode.get(subTypeName);
if (!subType.isEmpty()) {
ObjectNode fineGrainedLineage = instance.objectNode();
AtomicReference<Float> minimumConfidenceScore = new AtomicReference<>(1.0f);

fineGrainedLineage.put(typeName, subTypeName);
fineGrainedLineage.put(
FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation);
// Array to actually be filled out
fineGrainedLineage.set(arrayTypeName, instance.arrayNode());
// Added to pass model validation, because we have no way of appropriately pairing
// upstreams and downstreams
// within fine grained lineages consistently due to being able to have multiple
// downstream types paired with a single
// transform operation, we just set a default type because it's a required property
fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE);
subType
.fieldNames()
.forEachRemaining(
subTypeKey -> {
((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey);
Float scoreValue = subType.get(subTypeKey).floatValue();
if (scoreValue <= minimumConfidenceScore.get()) {
minimumConfidenceScore.set(scoreValue);
fineGrainedLineage.set(
FINE_GRAINED_CONFIDENCE_SCORE,
instance.numberNode(minimumConfidenceScore.get()));
}
});
arrayNode.add(fineGrainedLineage);
}
});
}
}
Loading

0 comments on commit 4ae175f

Please sign in to comment.