Skip to content

Commit

Permalink
downstream column lineage
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 5, 2022
1 parent d4f0890 commit f257af6
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)


### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
}
}
58 changes: 58 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,64 @@ Set<ColumnLineageNodeData> getLineage(
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);

@SqlQuery(
"""
WITH RECURSIVE
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
SELECT *, 0 as depth
FROM column_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
UNION
SELECT
adjacent_node.output_dataset_version_uuid,
adjacent_node.output_dataset_field_uuid,
adjacent_node.input_dataset_version_uuid,
adjacent_node.input_dataset_field_uuid,
adjacent_node.transformation_description,
adjacent_node.transformation_type,
adjacent_node.created_at,
adjacent_node.updated_at,
node.depth + 1 as depth
FROM column_lineage adjacent_node, column_lineage_recursive node
WHERE (
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid)
OR (adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid)
)
AND node.depth < :depth
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
""")
Set<ColumnLineageNodeData> getLineageWithDownstream(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);

@SqlQuery(
"""
WITH selected_column_lineage AS (
Expand Down
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
this.datasetFieldDao = datasetFieldDao;
}

public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) {
public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) {
List<UUID> columnNodeUuids = getColumnNodeUuids(nodeId);

if (columnNodeUuids.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
} else if (withDownstream) {
return toLineage(getLineageWithDownstream(depth, columnNodeUuids, createdAtUntil));
} else {
return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
}

return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
}

private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/api/ColumnLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class)))
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class)))
.thenReturn(LINEAGE);

ServiceFactory serviceFactory =
Expand Down
56 changes: 54 additions & 2 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() {

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

assertThat(lineage.getGraph()).hasSize(3);

Expand Down Expand Up @@ -156,12 +159,16 @@ public void testLineageByDatasetId() {

Lineage lineageByField =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

Lineage lineageByDataset =
lineageService.lineage(
NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
20,
false,
Instant.now());

// lineage of dataset and column should be equal
Expand Down Expand Up @@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() {
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")),
20,
false,
Instant.now()));

assertThrows(
Expand All @@ -204,13 +212,15 @@ public void testLineageWhenLineageEmpty() {
NodeId.of(
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))),
20,
false,
Instant.now()));

assertThat(
lineageService
.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")),
20,
false,
Instant.now())
.getGraph())
.hasSize(0);
Expand Down Expand Up @@ -268,6 +278,48 @@ public void testEnrichDatasets() {
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
}

@Test
public void testGetLineageWithDownstream() {
LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
true,
Instant.now());

// assert that get lineage of dataset_B should co also return dataset_A and dataset_C
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a"))
.findAny())
.isPresent();
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
.findAny())
.isPresent();
}

private Optional<Node> getNode(Lineage lineage, String datasetName, String fieldName) {
return lineage.getGraph().stream()
.filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName))
Expand Down

0 comments on commit f257af6

Please sign in to comment.