Skip to content

Commit

Permalink
downstream column lineage
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 11, 2022
1 parent 6fd6416 commit b23be06
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
}
}
24 changes: 14 additions & 10 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,20 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
)
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
adjacent_node.output_dataset_version_uuid,
adjacent_node.output_dataset_field_uuid,
adjacent_node.input_dataset_version_uuid,
adjacent_node.input_dataset_field_uuid,
adjacent_node.transformation_description,
adjacent_node.transformation_type,
adjacent_node.created_at,
adjacent_node.updated_at,
node.depth + 1 as depth
FROM column_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
FROM column_lineage adjacent_node, column_lineage_recursive node
WHERE (
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
)
SELECT
Expand Down Expand Up @@ -152,6 +155,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
boolean withDownstream,
Instant createdAtUntil);

@SqlQuery(
Expand Down
5 changes: 2 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
this.datasetFieldDao = datasetFieldDao;
}

public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) {
public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) {
List<UUID> columnNodeUuids = getColumnNodeUuids(nodeId);
if (columnNodeUuids.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
}

return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
return toLineage(getLineage(depth, columnNodeUuids, withDownstream, createdAtUntil));
}

private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/api/ColumnLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class)))
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class)))
.thenReturn(LINEAGE);

ServiceFactory serviceFactory =
Expand Down
28 changes: 18 additions & 10 deletions api/src/test/java/marquez/db/ColumnLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ void testGetLineage() {
UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0);
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();
Set<ColumnLineageNodeData> lineage =
dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now());
dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now());

assertEquals(2, lineage.size());

Expand Down Expand Up @@ -326,7 +326,8 @@ void testGetLineageWhenNoLineageForColumn() {
UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get();

// assert lineage is empty
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty();
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
.isEmpty();
}

/**
Expand Down Expand Up @@ -392,11 +393,12 @@ void testGetLineageWithLimitedDepth() {
UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get();

// make sure dataset are constructed properly
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(3);

// verify graph size is 2 when max depth is 1
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2);
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(2);
}

@Test
Expand Down Expand Up @@ -462,9 +464,9 @@ void testGetLineageWhenCycleExists() {
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();

// column lineages for col_a and col_e should be of size 3
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
.hasSize(3);
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now()))
.hasSize(3);
}

Expand Down Expand Up @@ -524,7 +526,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() {

// assert input fields for col_d contain col_a and col_c
List<String> inputFields =
dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream()
dao.getLineage(20, Collections.singletonList(field_col_c), false, Instant.now()).stream()
.filter(node -> node.getDataset().equals("dataset_b"))
.flatMap(node -> node.getInputFields().stream())
.map(input -> input.getField())
Expand Down Expand Up @@ -558,11 +560,17 @@ void testGetLineagePointInTime() {
// assert lineage is empty before and present after
assertThat(
dao.getLineage(
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1)))
20,
Collections.singletonList(field_col_b),
false,
columnLineageCreatedAt.minusSeconds(1)))
.isEmpty();
assertThat(
dao.getLineage(
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1)))
20,
Collections.singletonList(field_col_b),
false,
columnLineageCreatedAt.plusSeconds(1)))
.hasSize(1);
}

Expand Down Expand Up @@ -590,7 +598,7 @@ void testGetLineageWhenJobRunMultipleTimes() {
UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0);
UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get();

assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), false, Instant.now()))
.hasSize(1);
}
}
56 changes: 54 additions & 2 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() {

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

assertThat(lineage.getGraph()).hasSize(3);

Expand Down Expand Up @@ -156,12 +159,16 @@ public void testLineageByDatasetId() {

Lineage lineageByField =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

Lineage lineageByDataset =
lineageService.lineage(
NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
20,
false,
Instant.now());

// lineage of dataset and column should be equal
Expand Down Expand Up @@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() {
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")),
20,
false,
Instant.now()));

assertThrows(
Expand All @@ -204,13 +212,15 @@ public void testLineageWhenLineageEmpty() {
NodeId.of(
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))),
20,
false,
Instant.now()));

assertThat(
lineageService
.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")),
20,
false,
Instant.now())
.getGraph())
.hasSize(0);
Expand Down Expand Up @@ -268,6 +278,48 @@ public void testEnrichDatasets() {
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
}

@Test
public void testGetLineageWithDownstream() {
LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
true,
Instant.now());

// assert that get lineage of dataset_B should co also return dataset_A and dataset_C
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a"))
.findAny())
.isPresent();
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
.findAny())
.isPresent();
}

@Test
public void testEnrichDatasetsHasNoDuplicates() {
LineageEvent.Dataset dataset_A = getDatasetA();
Expand Down

0 comments on commit b23be06

Please sign in to comment.