Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Downstream column lineage #2159

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build();
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
}
}
24 changes: 14 additions & 10 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,20 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
)
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
adjacent_node.output_dataset_version_uuid,
adjacent_node.output_dataset_field_uuid,
adjacent_node.input_dataset_version_uuid,
adjacent_node.input_dataset_field_uuid,
adjacent_node.transformation_description,
adjacent_node.transformation_type,
adjacent_node.created_at,
adjacent_node.updated_at,
node.depth + 1 as depth
FROM column_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
FROM column_lineage adjacent_node, column_lineage_recursive node
WHERE (
(node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage
OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage
)
AND node.depth < :depth
)
SELECT
Expand Down Expand Up @@ -152,6 +155,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :crea
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
boolean withDownstream,
Instant createdAtUntil);

@SqlQuery(
Expand Down
5 changes: 2 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
this.datasetFieldDao = datasetFieldDao;
}

public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) {
public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) {
List<UUID> columnNodeUuids = getColumnNodeUuids(nodeId);
if (columnNodeUuids.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
}

return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil));
return toLineage(getLineage(depth, columnNodeUuids, withDownstream, createdAtUntil));
}

private Lineage toLineage(Set<ColumnLineageNodeData> lineageNodeData) {
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/api/ColumnLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class)))
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class)))
.thenReturn(LINEAGE);

ServiceFactory serviceFactory =
Expand Down
28 changes: 18 additions & 10 deletions api/src/test/java/marquez/db/ColumnLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ void testGetLineage() {
UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0);
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();
Set<ColumnLineageNodeData> lineage =
dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now());
dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now());

assertEquals(2, lineage.size());

Expand Down Expand Up @@ -326,7 +326,8 @@ void testGetLineageWhenNoLineageForColumn() {
UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get();

// assert lineage is empty
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty();
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
.isEmpty();
}

/**
Expand Down Expand Up @@ -392,11 +393,12 @@ void testGetLineageWithLimitedDepth() {
UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get();

// make sure dataset are constructed properly
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(3);

// verify graph size is 2 when max depth is 1
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2);
assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now()))
.hasSize(2);
}

@Test
Expand Down Expand Up @@ -462,9 +464,9 @@ void testGetLineageWhenCycleExists() {
UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get();

// column lineages for col_a and col_e should be of size 3
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now()))
.hasSize(3);
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now()))
.hasSize(3);
}

Expand Down Expand Up @@ -524,7 +526,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() {

// assert input fields for col_d contain col_a and col_c
List<String> inputFields =
dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream()
dao.getLineage(20, Collections.singletonList(field_col_c), false, Instant.now()).stream()
.filter(node -> node.getDataset().equals("dataset_b"))
.flatMap(node -> node.getInputFields().stream())
.map(input -> input.getField())
Expand Down Expand Up @@ -558,11 +560,17 @@ void testGetLineagePointInTime() {
// assert lineage is empty before and present after
assertThat(
dao.getLineage(
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1)))
20,
Collections.singletonList(field_col_b),
false,
columnLineageCreatedAt.minusSeconds(1)))
.isEmpty();
assertThat(
dao.getLineage(
20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1)))
20,
Collections.singletonList(field_col_b),
false,
columnLineageCreatedAt.plusSeconds(1)))
.hasSize(1);
}

Expand Down Expand Up @@ -590,7 +598,7 @@ void testGetLineageWhenJobRunMultipleTimes() {
UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0);
UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get();

assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now()))
assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), false, Instant.now()))
.hasSize(1);
}
}
56 changes: 54 additions & 2 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() {

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

assertThat(lineage.getGraph()).hasSize(3);

Expand Down Expand Up @@ -156,12 +159,16 @@ public void testLineageByDatasetId() {

Lineage lineageByField =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now());
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
false,
Instant.now());

Lineage lineageByDataset =
lineageService.lineage(
NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))),
20,
false,
Instant.now());

// lineage of dataset and column should be equal
Expand Down Expand Up @@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() {
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")),
20,
false,
Instant.now()));

assertThrows(
Expand All @@ -204,13 +212,15 @@ public void testLineageWhenLineageEmpty() {
NodeId.of(
new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))),
20,
false,
Instant.now()));

assertThat(
lineageService
.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")),
20,
false,
Instant.now())
.getGraph())
.hasSize(0);
Expand Down Expand Up @@ -268,6 +278,48 @@ public void testEnrichDatasets() {
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
}

@Test
public void testGetLineageWithDownstream() {
LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
LineageEvent.Dataset dataset_C = getDatasetC();

LineageTestUtils.createLineageRow(
openLineageDao,
"job1",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_A),
Arrays.asList(dataset_B));

LineageTestUtils.createLineageRow(
openLineageDao,
"job2",
"COMPLETE",
jobFacet,
Arrays.asList(dataset_B),
Arrays.asList(dataset_C));

Lineage lineage =
lineageService.lineage(
NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")),
20,
true,
Instant.now());

// assert that get lineage of dataset_B should co also return dataset_A and dataset_C
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a"))
.findAny())
.isPresent();
assertThat(
lineage.getGraph().stream()
.filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d"))
.findAny())
.isPresent();
}

@Test
public void testEnrichDatasetsHasNoDuplicates() {
LineageEvent.Dataset dataset_A = getDatasetA();
Expand Down