From 4ae175f9c930360ea1cd71ce4061de337f1997cd Mon Sep 17 00:00:00 2001 From: RyanHolstien Date: Mon, 11 Dec 2023 14:25:43 -0800 Subject: [PATCH] feat(patch): support fine grained lineage patches (#9408) Co-authored-by: Harshal Sheth --- .../dataset/UpstreamLineageTemplate.java | 271 ++++++++++++- .../registry/UpstreamLineageTemplateTest.java | 359 ++++++++++++++++++ .../java/com/linkedin/metadata/Constants.java | 5 + .../src/datahub/specific/dataset.py | 107 +++++- .../unit/patch/complex_dataset_patch.json | 45 ++- .../tests/unit/patch/test_patch_builder.py | 16 + .../dataset/UpstreamLineagePatchBuilder.java | 231 ++++++++++- .../java/datahub/client/patch/PatchTest.java | 24 +- 8 files changed, 1023 insertions(+), 35 deletions(-) create mode 100644 entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java diff --git a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java index 35816895669be..81a4065dedb1a 100644 --- a/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java +++ b/entity-registry/src/main/java/com/linkedin/metadata/models/registry/template/dataset/UpstreamLineageTemplate.java @@ -1,20 +1,41 @@ package com.linkedin.metadata.models.registry.template.dataset; +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; +import static com.linkedin.metadata.Constants.*; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Streams; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.RecordTemplate; import com.linkedin.dataset.FineGrainedLineageArray; import com.linkedin.dataset.UpstreamArray; import com.linkedin.dataset.UpstreamLineage; -import com.linkedin.metadata.models.registry.template.ArrayMergingTemplate; +import com.linkedin.metadata.models.registry.template.CompoundKeyTemplate; import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.annotation.Nonnull; +import javax.annotation.Nullable; -public class UpstreamLineageTemplate implements ArrayMergingTemplate { +public class UpstreamLineageTemplate extends CompoundKeyTemplate { + // Fields private static final String UPSTREAMS_FIELD_NAME = "upstreams"; private static final String DATASET_FIELD_NAME = "dataset"; + private static final String FINE_GRAINED_LINEAGES_FIELD_NAME = "fineGrainedLineages"; + private static final String FINE_GRAINED_UPSTREAM_TYPE = "upstreamType"; + private static final String FINE_GRAINED_UPSTREAMS = "upstreams"; + private static final String FINE_GRAINED_DOWNSTREAM_TYPE = "downstreamType"; + private static final String FINE_GRAINED_DOWNSTREAMS = "downstreams"; + private static final String FINE_GRAINED_TRANSFORMATION_OPERATION = "transformOperation"; + private static final String FINE_GRAINED_CONFIDENCE_SCORE = "confidenceScore"; - // TODO: Fine Grained Lineages not patchable at this time, they don't have a well established key + // Template support + private static final String NONE_TRANSFORMATION_TYPE = "NONE"; + private static final Float DEFAULT_CONFIDENCE_SCORE = 1.0f; @Override public UpstreamLineage getSubtype(RecordTemplate recordTemplate) throws ClassCastException { @@ -42,14 +63,250 @@ public UpstreamLineage getDefault() { @Nonnull @Override public JsonNode transformFields(JsonNode baseNode) { - return arrayFieldToMap( - baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + JsonNode transformedNode = + arrayFieldToMap( + baseNode, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + ((ObjectNode) transformedNode) + .set( + FINE_GRAINED_LINEAGES_FIELD_NAME, + combineAndTransformFineGrainedLineages( + transformedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME))); + + return transformedNode; } @Nonnull @Override public JsonNode rebaseFields(JsonNode patched) { - return transformedMapToArray( - patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + JsonNode rebasedNode = + transformedMapToArray( + patched, UPSTREAMS_FIELD_NAME, Collections.singletonList(DATASET_FIELD_NAME)); + ((ObjectNode) rebasedNode) + .set( + FINE_GRAINED_LINEAGES_FIELD_NAME, + reconstructFineGrainedLineages(rebasedNode.get(FINE_GRAINED_LINEAGES_FIELD_NAME))); + return rebasedNode; + } + + /** + * Combines fine grained lineage array into a map using upstream and downstream types as keys, + * defaulting when not present. Due to this construction, patches will look like: path: + * /fineGrainedLineages/TRANSFORMATION_OPERATION/(upstreamType || downstreamType)/TYPE/FIELD_URN, + * op: ADD/REMOVE, value: float (confidenceScore) Due to the way FineGrainedLineage was designed + * it doesn't necessarily have a consistent key we can reference, so this specialized method + * mimics the arrayFieldToMap of the super class with the specialization that it does not put the + * full value of the aspect at the end of the key, just the particular array. This prevents + * unintended overwrites through improper MCP construction that is technically allowed by the + * schema when combining under fields that form the natural key. + * + * @param fineGrainedLineages the fine grained lineage array node + * @return the modified {@link JsonNode} with array fields transformed to maps + */ + private JsonNode combineAndTransformFineGrainedLineages(@Nullable JsonNode fineGrainedLineages) { + ObjectNode mapNode = instance.objectNode(); + if (!(fineGrainedLineages instanceof ArrayNode) || fineGrainedLineages.isEmpty()) { + return mapNode; + } + JsonNode lineageCopy = fineGrainedLineages.deepCopy(); + + lineageCopy + .elements() + .forEachRemaining( + node -> { + JsonNode nodeClone = node.deepCopy(); + String transformationOperation = + nodeClone.has(FINE_GRAINED_TRANSFORMATION_OPERATION) + ? nodeClone.get(FINE_GRAINED_TRANSFORMATION_OPERATION).asText() + : NONE_TRANSFORMATION_TYPE; + + if (!mapNode.has(transformationOperation)) { + mapNode.set(transformationOperation, instance.objectNode()); + } + ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + + Float confidenceScore = + nodeClone.has(FINE_GRAINED_CONFIDENCE_SCORE) + ? nodeClone.get(FINE_GRAINED_CONFIDENCE_SCORE).floatValue() + : DEFAULT_CONFIDENCE_SCORE; + + String upstreamType = + nodeClone.has(FINE_GRAINED_UPSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_UPSTREAM_TYPE).asText() + : null; + String downstreamType = + nodeClone.has(FINE_GRAINED_DOWNSTREAM_TYPE) + ? nodeClone.get(FINE_GRAINED_DOWNSTREAM_TYPE).asText() + : null; + ArrayNode upstreams = + nodeClone.has(FINE_GRAINED_UPSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_UPSTREAMS) + : null; + ArrayNode downstreams = + nodeClone.has(FINE_GRAINED_DOWNSTREAMS) + ? (ArrayNode) nodeClone.get(FINE_GRAINED_DOWNSTREAMS) + : null; + + // Handle upstreams + if (upstreamType == null) { + // Determine default type + Urn upstreamUrn = + upstreams != null ? UrnUtils.getUrn(upstreams.get(0).asText()) : null; + if (upstreamUrn != null + && SCHEMA_FIELD_ENTITY_NAME.equals(upstreamUrn.getEntityType())) { + upstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; + } else { + upstreamType = FINE_GRAINED_LINEAGE_DATASET_TYPE; + } + } + if (!transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE)) { + transformationOperationNode.set(FINE_GRAINED_UPSTREAM_TYPE, instance.objectNode()); + } + ObjectNode upstreamTypeNode = + (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE); + if (!upstreamTypeNode.has(upstreamType)) { + upstreamTypeNode.set(upstreamType, instance.objectNode()); + } + if (upstreams != null) { + addUrnsToSubType(upstreamTypeNode, upstreams, upstreamType, confidenceScore); + } + + // Handle downstreams + if (downstreamType == null) { + // Determine default type + if (downstreams != null && downstreams.size() > 1) { + downstreamType = FINE_GRAINED_LINEAGE_FIELD_SET_TYPE; + } else { + downstreamType = FINE_GRAINED_LINEAGE_FIELD_TYPE; + } + } + if (!transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE)) { + transformationOperationNode.set( + FINE_GRAINED_DOWNSTREAM_TYPE, instance.objectNode()); + } + ObjectNode downstreamTypeNode = + (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE); + if (!downstreamTypeNode.has(downstreamType)) { + downstreamTypeNode.set(downstreamType, instance.objectNode()); + } + if (downstreams != null) { + addUrnsToSubType(downstreamTypeNode, downstreams, downstreamType, confidenceScore); + } + }); + return mapNode; + } + + private void addUrnsToSubType( + JsonNode superType, ArrayNode urnsList, String subType, Float confidenceScore) { + ObjectNode upstreamSubTypeNode = (ObjectNode) superType.get(subType); + // Will overwrite repeat urns with different confidence scores with the most recently seen + upstreamSubTypeNode.setAll( + Streams.stream(urnsList.elements()) + .map(JsonNode::asText) + .distinct() + .collect(Collectors.toMap(urn -> urn, urn -> instance.numberNode(confidenceScore)))); + } + + /** + * Takes the transformed fine grained lineages map from pre-processing and reconstructs an array + * of FineGrainedLineages Avoids producing side effects by copying nodes, use resulting node and + * not the original + * + * @param transformedFineGrainedLineages the transformed fine grained lineage map + * @return the modified {@link JsonNode} formatted consistent with the original schema + */ + private ArrayNode reconstructFineGrainedLineages(JsonNode transformedFineGrainedLineages) { + if (transformedFineGrainedLineages instanceof ArrayNode) { + // We already have an ArrayNode, no need to transform. This happens during `replace` + // operations + return (ArrayNode) transformedFineGrainedLineages; + } + ObjectNode mapNode = (ObjectNode) transformedFineGrainedLineages; + ArrayNode arrayNode = instance.arrayNode(); + + mapNode + .fieldNames() + .forEachRemaining( + transformationOperation -> { + final ObjectNode transformationOperationNode = + (ObjectNode) mapNode.get(transformationOperation); + final ObjectNode upstreamType = + transformationOperationNode.has(FINE_GRAINED_UPSTREAM_TYPE) + ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_UPSTREAM_TYPE) + : instance.objectNode(); + final ObjectNode downstreamType = + transformationOperationNode.has(FINE_GRAINED_DOWNSTREAM_TYPE) + ? (ObjectNode) transformationOperationNode.get(FINE_GRAINED_DOWNSTREAM_TYPE) + : instance.objectNode(); + + // Handle upstreams + if (!upstreamType.isEmpty()) { + populateTypeNode( + upstreamType, + transformationOperation, + FINE_GRAINED_UPSTREAM_TYPE, + FINE_GRAINED_UPSTREAMS, + FINE_GRAINED_DOWNSTREAM_TYPE, + arrayNode); + } + + // Handle downstreams + if (!downstreamType.isEmpty()) { + populateTypeNode( + downstreamType, + transformationOperation, + FINE_GRAINED_DOWNSTREAM_TYPE, + FINE_GRAINED_DOWNSTREAMS, + FINE_GRAINED_UPSTREAM_TYPE, + arrayNode); + } + }); + + return arrayNode; + } + + private void populateTypeNode( + JsonNode typeNode, + String transformationOperation, + String typeName, + String arrayTypeName, + String defaultTypeName, + ArrayNode arrayNode) { + typeNode + .fieldNames() + .forEachRemaining( + subTypeName -> { + ObjectNode subType = (ObjectNode) typeNode.get(subTypeName); + if (!subType.isEmpty()) { + ObjectNode fineGrainedLineage = instance.objectNode(); + AtomicReference minimumConfidenceScore = new AtomicReference<>(1.0f); + + fineGrainedLineage.put(typeName, subTypeName); + fineGrainedLineage.put( + FINE_GRAINED_TRANSFORMATION_OPERATION, transformationOperation); + // Array to actually be filled out + fineGrainedLineage.set(arrayTypeName, instance.arrayNode()); + // Added to pass model validation, because we have no way of appropriately pairing + // upstreams and downstreams + // within fine grained lineages consistently due to being able to have multiple + // downstream types paired with a single + // transform operation, we just set a default type because it's a required property + fineGrainedLineage.put(defaultTypeName, FINE_GRAINED_LINEAGE_FIELD_SET_TYPE); + subType + .fieldNames() + .forEachRemaining( + subTypeKey -> { + ((ArrayNode) fineGrainedLineage.get(arrayTypeName)).add(subTypeKey); + Float scoreValue = subType.get(subTypeKey).floatValue(); + if (scoreValue <= minimumConfidenceScore.get()) { + minimumConfidenceScore.set(scoreValue); + fineGrainedLineage.set( + FINE_GRAINED_CONFIDENCE_SCORE, + instance.numberNode(minimumConfidenceScore.get())); + } + }); + arrayNode.add(fineGrainedLineage); + } + }); } } diff --git a/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java new file mode 100644 index 0000000000000..07982a87be56c --- /dev/null +++ b/entity-registry/src/test/java/com/linkedin/metadata/models/registry/UpstreamLineageTemplateTest.java @@ -0,0 +1,359 @@ +package com.linkedin.metadata.models.registry; + +import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*; + +import com.fasterxml.jackson.databind.node.NumericNode; +import com.github.fge.jackson.jsonpointer.JsonPointer; +import com.github.fge.jsonpatch.AddOperation; +import com.github.fge.jsonpatch.JsonPatch; +import com.github.fge.jsonpatch.JsonPatchOperation; +import com.github.fge.jsonpatch.RemoveOperation; +import com.linkedin.common.UrnArray; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.DataMap; +import com.linkedin.dataset.FineGrainedLineage; +import com.linkedin.dataset.FineGrainedLineageDownstreamType; +import com.linkedin.dataset.FineGrainedLineageUpstreamType; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.metadata.models.registry.template.dataset.UpstreamLineageTemplate; +import java.util.ArrayList; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class UpstreamLineageTemplateTest { + @Test + public void testPatchUpstream() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + upstreamConfidenceScore); + patchOperations.add(operation); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setUpstreams(urns); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + + // Test non-overwrite upstreams and correct confidence score + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + upstreamConfidenceScore); + NumericNode upstreamConfidenceScore2 = instance.numberNode(0.1f); + JsonPatchOperation operation3 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + upstreamConfidenceScore2); + List patchOperations2 = new ArrayList<>(); + patchOperations2.add(operation2); + patchOperations2.add(operation3); + JsonPatch jsonPatch2 = new JsonPatch(patchOperations2); + UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 0.1); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + UrnArray urns2 = new UrnArray(); + Urn urn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + urns2.add(urn1); + urns2.add(urn2); + fineGrainedLineage2.setUpstreams(urns2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); + + // Check different upstream types + JsonPatchOperation operation4 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), + upstreamConfidenceScore); + List patchOperations3 = new ArrayList<>(); + patchOperations3.add(operation4); + JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); + UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap3 = new DataMap(); + dataMap3.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); + UrnArray urns3 = new UrnArray(); + Urn urn3 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); + urns3.add(urn3); + fineGrainedLineage3.setUpstreams(urns3); + fineGrainedLineage3.setTransformOperation("CREATE"); + fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + // Splits into two for different types + Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); + + // Check different transform types + JsonPatchOperation operation5 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), + upstreamConfidenceScore); + List patchOperations4 = new ArrayList<>(); + patchOperations4.add(operation5); + JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); + UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap4 = new DataMap(); + dataMap4.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); + UrnArray urns4 = new UrnArray(); + Urn urn4 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); + urns4.add(urn4); + fineGrainedLineage4.setUpstreams(urns4); + fineGrainedLineage4.setTransformOperation("TRANSFORM"); + fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.DATASET); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + // New entry in array because of new transformation type + Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); + + // Remove + JsonPatchOperation removeOperation = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); + JsonPatchOperation removeOperation2 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); + JsonPatchOperation removeOperation3 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); + JsonPatchOperation removeOperation4 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); + + List removeOperations = new ArrayList<>(); + removeOperations.add(removeOperation); + removeOperations.add(removeOperation2); + removeOperations.add(removeOperation3); + removeOperations.add(removeOperation4); + JsonPatch removePatch = new JsonPatch(removeOperations); + UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); + Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); + } + + @Test + public void testPatchDownstream() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + downstreamConfidenceScore); + patchOperations.add(operation); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setDownstreams(urns); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + Assert.assertEquals(result.getFineGrainedLineages().get(0), fineGrainedLineage); + + // Test non-overwrite downstreams and correct confidence score + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + downstreamConfidenceScore); + NumericNode downstreamConfidenceScore2 = instance.numberNode(0.1f); + JsonPatchOperation operation3 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"), + downstreamConfidenceScore2); + List patchOperations2 = new ArrayList<>(); + patchOperations2.add(operation2); + patchOperations2.add(operation3); + JsonPatch jsonPatch2 = new JsonPatch(patchOperations2); + UpstreamLineage result2 = upstreamLineageTemplate.applyPatch(result, jsonPatch2); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 0.1); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + UrnArray urns2 = new UrnArray(); + Urn urn2 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)"); + urns2.add(urn1); + urns2.add(urn2); + fineGrainedLineage2.setDownstreams(urns2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + Assert.assertEquals(result2.getFineGrainedLineages().get(0), fineGrainedLineage2); + + // Check different downstream types + JsonPatchOperation operation4 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"), + downstreamConfidenceScore); + List patchOperations3 = new ArrayList<>(); + patchOperations3.add(operation4); + JsonPatch jsonPatch3 = new JsonPatch(patchOperations3); + UpstreamLineage result3 = upstreamLineageTemplate.applyPatch(result2, jsonPatch3); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap3 = new DataMap(); + dataMap3.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage3 = new FineGrainedLineage(dataMap3); + UrnArray urns3 = new UrnArray(); + Urn urn3 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)"); + urns3.add(urn3); + fineGrainedLineage3.setDownstreams(urns3); + fineGrainedLineage3.setTransformOperation("CREATE"); + fineGrainedLineage3.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage3.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + // Splits into two for different types + Assert.assertEquals(result3.getFineGrainedLineages().get(1), fineGrainedLineage3); + + // Check different transform types + JsonPatchOperation operation5 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"), + downstreamConfidenceScore); + List patchOperations4 = new ArrayList<>(); + patchOperations4.add(operation5); + JsonPatch jsonPatch4 = new JsonPatch(patchOperations4); + UpstreamLineage result4 = upstreamLineageTemplate.applyPatch(result3, jsonPatch4); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap4 = new DataMap(); + dataMap4.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage4 = new FineGrainedLineage(dataMap4); + UrnArray urns4 = new UrnArray(); + Urn urn4 = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)"); + urns4.add(urn4); + fineGrainedLineage4.setDownstreams(urns4); + fineGrainedLineage4.setTransformOperation("TRANSFORM"); + fineGrainedLineage4.setDownstreamType(FineGrainedLineageDownstreamType.FIELD); + fineGrainedLineage4.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + // New entry in array because of new transformation type + Assert.assertEquals(result4.getFineGrainedLineages().get(2), fineGrainedLineage4); + + // Remove + JsonPatchOperation removeOperation = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)")); + JsonPatchOperation removeOperation2 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c2)")); + JsonPatchOperation removeOperation3 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD)")); + JsonPatchOperation removeOperation4 = + new RemoveOperation( + new JsonPointer( + "/fineGrainedLineages/TRANSFORM/downstreamType/FIELD/urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_2,PROD)")); + + List removeOperations = new ArrayList<>(); + removeOperations.add(removeOperation); + removeOperations.add(removeOperation2); + removeOperations.add(removeOperation3); + removeOperations.add(removeOperation4); + JsonPatch removePatch = new JsonPatch(removeOperations); + UpstreamLineage finalResult = upstreamLineageTemplate.applyPatch(result4, removePatch); + Assert.assertEquals(upstreamLineageTemplate.getDefault(), finalResult); + } + + @Test + public void testUpAndDown() throws Exception { + UpstreamLineageTemplate upstreamLineageTemplate = new UpstreamLineageTemplate(); + UpstreamLineage upstreamLineage = upstreamLineageTemplate.getDefault(); + List patchOperations = new ArrayList<>(); + NumericNode downstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/downstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + downstreamConfidenceScore); + patchOperations.add(operation); + NumericNode upstreamConfidenceScore = instance.numberNode(1.0f); + JsonPatchOperation operation2 = + new AddOperation( + new JsonPointer( + "/fineGrainedLineages/CREATE/upstreamType/FIELD_SET/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"), + upstreamConfidenceScore); + patchOperations.add(operation2); + JsonPatch jsonPatch = new JsonPatch(patchOperations); + + // Initial population test + UpstreamLineage result = upstreamLineageTemplate.applyPatch(upstreamLineage, jsonPatch); + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap = new DataMap(); + dataMap.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage = new FineGrainedLineage(dataMap); + UrnArray urns = new UrnArray(); + Urn urn1 = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:bigquery,upstream_table_1,PROD),c1)"); + urns.add(urn1); + fineGrainedLineage.setTransformOperation("CREATE"); + fineGrainedLineage.setUpstreams(urns); + fineGrainedLineage.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage.setDownstreams(urns); + + // Hack because Jackson parses values to doubles instead of floats + DataMap dataMap2 = new DataMap(); + dataMap2.put("confidenceScore", 1.0); + FineGrainedLineage fineGrainedLineage2 = new FineGrainedLineage(dataMap2); + fineGrainedLineage2.setTransformOperation("CREATE"); + fineGrainedLineage2.setUpstreamType(FineGrainedLineageUpstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreamType(FineGrainedLineageDownstreamType.FIELD_SET); + fineGrainedLineage2.setDownstreams(urns); + + Assert.assertEquals(result.getFineGrainedLineages().get(1), fineGrainedLineage2); + } +} diff --git a/li-utils/src/main/java/com/linkedin/metadata/Constants.java b/li-utils/src/main/java/com/linkedin/metadata/Constants.java index f5a3c9c12ff70..3d9b533dc8f72 100644 --- a/li-utils/src/main/java/com/linkedin/metadata/Constants.java +++ b/li-utils/src/main/java/com/linkedin/metadata/Constants.java @@ -125,6 +125,11 @@ public class Constants { public static final String VIEW_PROPERTIES_ASPECT_NAME = "viewProperties"; public static final String DATASET_PROFILE_ASPECT_NAME = "datasetProfile"; + // Aspect support + public static final String FINE_GRAINED_LINEAGE_DATASET_TYPE = "DATASET"; + public static final String FINE_GRAINED_LINEAGE_FIELD_SET_TYPE = "FIELD_SET"; + public static final String FINE_GRAINED_LINEAGE_FIELD_TYPE = "FIELD"; + // Chart public static final String CHART_KEY_ASPECT_NAME = "chartKey"; public static final String CHART_INFO_ASPECT_NAME = "chartInfo"; diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index fcfe049fb15cf..294a80572669b 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -1,4 +1,4 @@ -from typing import Dict, Generic, List, Optional, TypeVar, Union +from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union from urllib.parse import quote from datahub.emitter.mcp_patch_builder import MetadataPatchProposal @@ -6,6 +6,9 @@ DatasetPropertiesClass as DatasetProperties, EditableDatasetPropertiesClass as EditableDatasetProperties, EditableSchemaMetadataClass as EditableSchemaMetadata, + FineGrainedLineageClass as FineGrainedLineage, + FineGrainedLineageDownstreamTypeClass as FineGrainedLineageDownstreamType, + FineGrainedLineageUpstreamTypeClass as FineGrainedLineageUpstreamType, GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, @@ -144,6 +147,108 @@ def set_upstream_lineages(self, upstreams: List[Upstream]) -> "DatasetPatchBuild ) return self + def add_fine_grained_upstream_lineage( + self, fine_grained_lineage: FineGrainedLineage + ) -> "DatasetPatchBuilder": + ( + transform_op, + upstream_type, + downstream_type, + ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) + for upstream_urn in fine_grained_lineage.upstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path=DatasetPatchBuilder.quote_fine_grained_upstream_path( + transform_op, upstream_type, upstream_urn + ), + value=fine_grained_lineage.confidenceScore, + ) + for downstream_urn in fine_grained_lineage.downstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path=DatasetPatchBuilder.quote_fine_grained_downstream_path( + transform_op, downstream_type, downstream_urn + ), + value=fine_grained_lineage.confidenceScore, + ) + return self + + @staticmethod + def get_fine_grained_key( + fine_grained_lineage: FineGrainedLineage, + ) -> Tuple[str, str, str]: + transform_op = fine_grained_lineage.transformOperation or "NONE" + upstream_type = ( + fine_grained_lineage.upstreamType + if isinstance(fine_grained_lineage.upstreamType, str) + else FineGrainedLineageUpstreamType.FIELD_SET + ) + downstream_type = ( + fine_grained_lineage.downstreamType + if isinstance(fine_grained_lineage.downstreamType, str) + else FineGrainedLineageDownstreamType.FIELD_SET + ) + return transform_op, upstream_type, downstream_type + + @staticmethod + def quote_fine_grained_downstream_path( + transform_op: str, downstream_type: str, downstream_urn: str + ) -> str: + return ( + f"/fineGrainedLineages/{quote(transform_op, safe='')}/downstreamType/" + f"{quote(downstream_type, safe='')}/{quote(downstream_urn, safe='')}" + ) + + @staticmethod + def quote_fine_grained_upstream_path( + transform_op: str, upstream_type: str, upstream_urn: str + ) -> str: + return ( + f"/fineGrainedLineages/{quote(transform_op, safe='')}/upstreamType/" + f"{quote(upstream_type, safe='')}/{quote(upstream_urn, safe='')}" + ) + + def remove_fine_grained_upstream_lineage( + self, fine_grained_lineage: FineGrainedLineage + ) -> "DatasetPatchBuilder": + ( + transform_op, + upstream_type, + downstream_type, + ) = DatasetPatchBuilder.get_fine_grained_key(fine_grained_lineage) + for upstream_urn in fine_grained_lineage.upstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "remove", + path=DatasetPatchBuilder.quote_fine_grained_upstream_path( + transform_op, upstream_type, upstream_urn + ), + value={}, + ) + for downstream_urn in fine_grained_lineage.downstreams or []: + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "remove", + path=DatasetPatchBuilder.quote_fine_grained_downstream_path( + transform_op, downstream_type, downstream_urn + ), + value={}, + ) + return self + + def set_fine_grained_upstream_lineages( + self, fine_grained_lineages: List[FineGrainedLineage] + ) -> "DatasetPatchBuilder": + self._add_patch( + UpstreamLineage.ASPECT_NAME, + "add", + path="/fineGrainedLineages", + value=fine_grained_lineages, + ) + return self + def add_tag(self, tag: Tag) -> "DatasetPatchBuilder": self._add_patch( GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json index d5dfe125942fb..ed5a7723ac2bf 100644 --- a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json +++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json @@ -42,26 +42,31 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "upstreamLineage", - "aspect": { - "json": [ - { - "op": "add", - "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", - "value": { - "auditStamp": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", - "type": "TRANSFORMED" - } - } - ] - } + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "json": [ + { + "op": "add", + "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", + "type": "TRANSFORMED" + } + }, + { + "op": "add", + "path": "/fineGrainedLineages/TRANSFORM/upstreamType/DATASET/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", + "value": 1.0 + } + ] + } }, { "entityType": "dataset", diff --git a/metadata-ingestion/tests/unit/patch/test_patch_builder.py b/metadata-ingestion/tests/unit/patch/test_patch_builder.py index 0701b3d696895..f05c4978f8644 100644 --- a/metadata-ingestion/tests/unit/patch/test_patch_builder.py +++ b/metadata-ingestion/tests/unit/patch/test_patch_builder.py @@ -7,6 +7,9 @@ from datahub.ingestion.sink.file import write_metadata_file from datahub.metadata.schema_classes import ( DatasetLineageTypeClass, + FineGrainedLineageClass, + FineGrainedLineageDownstreamTypeClass, + FineGrainedLineageUpstreamTypeClass, GenericAspectClass, MetadataChangeProposalClass, TagAssociationClass, @@ -53,6 +56,19 @@ def test_complex_dataset_patch( type=DatasetLineageTypeClass.TRANSFORMED, ) ) + .add_fine_grained_upstream_lineage( + fine_grained_lineage=FineGrainedLineageClass( + upstreamType=FineGrainedLineageUpstreamTypeClass.DATASET, + upstreams=[ + make_dataset_urn( + platform="hive", name="fct_users_created_upstream", env="PROD" + ) + ], + downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD_SET, + transformOperation="TRANSFORM", + confidenceScore=1.0, + ) + ) ) patcher.for_field("field1").add_tag(TagAssociationClass(tag=make_tag_urn("tag1"))) diff --git a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java index 6ded8a25b4e22..9db2ebc522e09 100644 --- a/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java +++ b/metadata-integration/java/datahub-client/src/main/java/datahub/client/patch/dataset/UpstreamLineagePatchBuilder.java @@ -5,10 +5,14 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.linkedin.common.urn.DatasetUrn; +import com.linkedin.common.urn.Urn; import com.linkedin.dataset.DatasetLineageType; +import com.linkedin.dataset.FineGrainedLineageDownstreamType; +import com.linkedin.dataset.FineGrainedLineageUpstreamType; import datahub.client.patch.AbstractMultiFieldPatchBuilder; import datahub.client.patch.PatchOperationType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.ToString; import org.apache.commons.lang3.tuple.ImmutableTriple; @@ -16,7 +20,8 @@ public class UpstreamLineagePatchBuilder extends AbstractMultiFieldPatchBuilder { - private static final String PATH_START = "/upstreams/"; + private static final String UPSTREAMS_PATH_START = "/upstreams/"; + private static final String FINE_GRAINED_PATH_START = "/fineGrainedLineages/"; private static final String DATASET_KEY = "dataset"; private static final String AUDIT_STAMP_KEY = "auditStamp"; private static final String TIME_KEY = "time"; @@ -34,13 +39,233 @@ public UpstreamLineagePatchBuilder addUpstream( .set(AUDIT_STAMP_KEY, auditStamp); pathValues.add( - ImmutableTriple.of(PatchOperationType.ADD.getValue(), PATH_START + datasetUrn, value)); + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), UPSTREAMS_PATH_START + datasetUrn, value)); return this; } public UpstreamLineagePatchBuilder removeUpstream(@Nonnull DatasetUrn datasetUrn) { pathValues.add( - ImmutableTriple.of(PatchOperationType.REMOVE.getValue(), PATH_START + datasetUrn, null)); + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), UPSTREAMS_PATH_START + datasetUrn, null)); + return this; + } + + /** + * Method for adding an upstream FineGrained Dataset + * + * @param datasetUrn dataset to be set as upstream + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedUpstreamDataset( + @Nonnull DatasetUrn datasetUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + "DATASET" + + "/" + + datasetUrn, + instance.numberNode(finalConfidenceScore))); + return this; + } + + /** + * Adds a field as a fine grained upstream + * + * @param schemaFieldUrn a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the upstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedUpstreamField( + @Nonnull Urn schemaFieldUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageUpstreamType type) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + instance.numberNode(finalConfidenceScore))); + + return this; + } + + /** + * Adds a field as a fine grained downstream + * + * @param schemaFieldUrn a schema field to be marked as downstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param confidenceScore optional, confidence score for the lineage edge. Defaults to 1.0 for + * full confidence + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the downstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder addFineGrainedDownstreamField( + @Nonnull Urn schemaFieldUrn, + @Nullable Float confidenceScore, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageDownstreamType type) { + Float finalConfidenceScore = getConfidenceScoreOrDefault(confidenceScore); + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.ADD.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "downstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + instance.numberNode(finalConfidenceScore))); + return this; + } + + private Float getConfidenceScoreOrDefault(@Nullable Float confidenceScore) { + float finalConfidenceScore; + if (confidenceScore != null && confidenceScore > 0 && confidenceScore <= 1.0f) { + finalConfidenceScore = confidenceScore; + } else { + finalConfidenceScore = 1.0f; + } + + return finalConfidenceScore; + } + + /** + * Removes a field as a fine grained upstream + * + * @param schemaFieldUrn a schema field to be marked as upstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the upstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder removeFineGrainedUpstreamField( + @Nonnull Urn schemaFieldUrn, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageUpstreamType type) { + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageUpstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + null)); + + return this; + } + + public UpstreamLineagePatchBuilder removeFineGrainedUpstreamDataset( + @Nonnull DatasetUrn datasetUrn, @Nonnull String transformationOperation) { + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "upstreamType" + + "/" + + "DATASET" + + "/" + + datasetUrn, + null)); + return this; + } + + /** + * Adds a field as a fine grained downstream + * + * @param schemaFieldUrn a schema field to be marked as downstream, format: + * urn:li:schemaField(DATASET_URN, COLUMN NAME) + * @param transformationOperation string operation type that describes the transformation + * operation happening in the lineage edge + * @param type the downstream lineage type, either Field or Field Set + * @return this builder + */ + public UpstreamLineagePatchBuilder removeFineGrainedDownstreamField( + @Nonnull Urn schemaFieldUrn, + @Nonnull String transformationOperation, + @Nullable FineGrainedLineageDownstreamType type) { + String finalType; + if (type == null) { + // Default to set of fields if not explicitly a single field + finalType = FineGrainedLineageDownstreamType.FIELD_SET.toString(); + } else { + finalType = type.toString(); + } + + pathValues.add( + ImmutableTriple.of( + PatchOperationType.REMOVE.getValue(), + FINE_GRAINED_PATH_START + + transformationOperation + + "/" + + "downstreamType" + + "/" + + finalType + + "/" + + schemaFieldUrn, + null)); return this; } diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java index 1d387acb0ce12..563742990f546 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/patch/PatchTest.java @@ -14,6 +14,7 @@ import com.linkedin.common.urn.DatasetUrn; import com.linkedin.common.urn.GlossaryTermUrn; import com.linkedin.common.urn.TagUrn; +import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.dataset.DatasetLineageType; import com.linkedin.metadata.graph.LineageDirection; @@ -49,15 +50,21 @@ public class PatchTest { public void testLocalUpstream() { RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build()); try { + DatasetUrn upstreamUrn = + DatasetUrn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); + Urn schemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( UrnUtils.getUrn( "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)")) - .addUpstream( - DatasetUrn.createFromString( - "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"), - DatasetLineageType.TRANSFORMED) + .addUpstream(upstreamUrn, DatasetLineageType.TRANSFORMED) + .addFineGrainedUpstreamDataset(upstreamUrn, null, "TRANSFORM") + .addFineGrainedUpstreamField(schemaFieldUrn, null, "TRANSFORM", null) + .addFineGrainedDownstreamField(schemaFieldUrn, null, "TRANSFORM", null) .build(); Future response = restEmitter.emit(upstreamPatch); @@ -73,6 +80,12 @@ public void testLocalUpstream() { public void testLocalUpstreamRemove() { RestEmitter restEmitter = new RestEmitter(RestEmitterConfig.builder().build()); try { + DatasetUrn upstreamUrn = + DatasetUrn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"); + Urn schemaFieldUrn = + UrnUtils.getUrn( + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD), foo)"); MetadataChangeProposal upstreamPatch = new UpstreamLineagePatchBuilder() .urn( @@ -81,6 +94,9 @@ public void testLocalUpstreamRemove() { .removeUpstream( DatasetUrn.createFromString( "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)")) + .removeFineGrainedUpstreamDataset(upstreamUrn, "TRANSFORM") + .removeFineGrainedUpstreamField(schemaFieldUrn, "TRANSFORM", null) + .removeFineGrainedDownstreamField(schemaFieldUrn, "TRANSFORM", null) .build(); Future response = restEmitter.emit(upstreamPatch);