From 50adb00620da883c2cf320b6faf0c0f4c23aa188 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 3 Oct 2022 11:46:16 +0200 Subject: [PATCH] downstream column lineage Signed-off-by: Pawel Leszczynski --- .../marquez/api/ColumnLineageResource.java | 6 +- .../java/marquez/db/ColumnLineageDao.java | 58 +++++++++++++++++++ .../marquez/service/ColumnLineageService.java | 8 ++- .../api/ColumnLineageResourceTest.java | 4 +- .../service/ColumnLineageServiceTest.java | 56 +++++++++++++++++- 5 files changed, 123 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/marquez/api/ColumnLineageResource.java b/api/src/main/java/marquez/api/ColumnLineageResource.java index 057e8b35ee..3a52a8bd02 100644 --- a/api/src/main/java/marquez/api/ColumnLineageResource.java +++ b/api/src/main/java/marquez/api/ColumnLineageResource.java @@ -43,8 +43,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) { @Produces(APPLICATION_JSON) public Response getLineage( @QueryParam("nodeId") @NotNull NodeId nodeId, - @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, + @QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream) throws ExecutionException, InterruptedException { - return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build(); + return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now())) + .build(); } } diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index b26a1213f1..3afc693fca 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -151,6 +151,64 @@ Set getLineage( @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, Instant createdAtUntil); + @SqlQuery( + """ + WITH RECURSIVE + dataset_fields_view AS ( + SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid + FROM dataset_fields df + INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid + ), + column_lineage_recursive AS ( + SELECT *, 0 as depth + FROM column_lineage + WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil + UNION + SELECT + adjacent_node.output_dataset_version_uuid, + adjacent_node.output_dataset_field_uuid, + adjacent_node.input_dataset_version_uuid, + adjacent_node.input_dataset_field_uuid, + adjacent_node.transformation_description, + adjacent_node.transformation_type, + adjacent_node.created_at, + adjacent_node.updated_at, + node.depth + 1 as depth + FROM column_lineage adjacent_node, column_lineage_recursive node + WHERE ( + (node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) + OR (adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) + ) + AND node.depth < :depth + ) + SELECT + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + FROM column_lineage_recursive clr + INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered + LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid + GROUP BY + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + """) + Set getLineageWithDownstream( + int depth, + @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, + Instant createdAtUntil); + @SqlQuery( """ WITH selected_column_lineage AS ( diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index 3aee7ee33d..5c510ed4b0 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -40,14 +40,16 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa this.datasetFieldDao = datasetFieldDao; } - public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) { + public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) { List columnNodeUuids = getColumnNodeUuids(nodeId); if (columnNodeUuids.isEmpty()) { throw new NodeIdNotFoundException("Could not find node"); + } else if (withDownstream) { + return toLineage(getLineageWithDownstream(depth, columnNodeUuids, createdAtUntil)); + } else { + return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil)); } - - return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil)); } private Lineage toLineage(Set lineageNodeData) { diff --git a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java index dd5c2ab03e..9ebf944e40 100644 --- a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java +++ b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java @@ -7,7 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest { ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"), new TypeReference<>() {}); LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); - when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class))) + when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class))) .thenReturn(LINEAGE); ServiceFactory serviceFactory = diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 5dbc2cd2c3..5d5dc53f60 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() { Lineage lineage = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + false, + Instant.now()); assertThat(lineage.getGraph()).hasSize(3); @@ -156,12 +159,16 @@ public void testLineageByDatasetId() { Lineage lineageByField = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + false, + Instant.now()); Lineage lineageByDataset = lineageService.lineage( NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), 20, + false, Instant.now()); // lineage of dataset and column should be equal @@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() { lineageService.lineage( NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), 20, + false, Instant.now())); assertThrows( @@ -204,6 +212,7 @@ public void testLineageWhenLineageEmpty() { NodeId.of( new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))), 20, + false, Instant.now())); assertThat( @@ -211,6 +220,7 @@ public void testLineageWhenLineageEmpty() { .lineage( NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), 20, + false, Instant.now()) .getGraph()) .hasSize(0); @@ -268,6 +278,48 @@ public void testEnrichDatasets() { .contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c")); } + @Test + public void testGetLineageWithDownstream() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineage = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + true, + Instant.now()); + + // assert that get lineage of dataset_B should co also return dataset_A and dataset_C + assertThat( + lineage.getGraph().stream() + .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a")) + .findAny()) + .isPresent(); + assertThat( + lineage.getGraph().stream() + .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d")) + .findAny()) + .isPresent(); + } + private Optional getNode(Lineage lineage, String datasetName, String fieldName) { return lineage.getGraph().stream() .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName))