Skip to content

Commit

Permalink
include column lineage in dataset resource
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 7, 2022
1 parent 496566e commit 950e293
Show file tree
Hide file tree
Showing 20 changed files with 332 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)

### Fixed
* Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike)
Expand Down
5 changes: 4 additions & 1 deletion api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import javax.validation.Valid;
Expand Down Expand Up @@ -85,10 +86,11 @@ public Response getDataset(
@PathParam("dataset") DatasetName datasetName) {
throwIfNotExists(namespaceName);

final Dataset dataset =
Dataset dataset =
datasetService
.findWithTags(namespaceName.getValue(), datasetName.getValue())
.orElseThrow(() -> new DatasetNotFoundException(datasetName));
columnLineageService.enrichWithColumnLineage(Arrays.asList(dataset));
return Response.ok(dataset).build();
}

Expand Down Expand Up @@ -147,6 +149,7 @@ public Response list(

final List<Dataset> datasets =
datasetService.findAllWithTags(namespaceName.getValue(), limit, offset);
columnLineageService.enrichWithColumnLineage(datasets);
final int totalCount = datasetService.countFor(namespaceName.getValue());
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
}
Expand Down
43 changes: 43 additions & 0 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,47 @@ Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);

@SqlQuery(
"""
WITH selected_column_lineage AS (
SELECT cl.*
FROM column_lineage cl
JOIN dataset_fields df ON df.uuid = cl.output_dataset_field_uuid
JOIN datasets_view dv ON dv.uuid = df.dataset_uuid
WHERE ARRAY[<values>]::DATASET_NAME[] && dv.dataset_symlinks
),
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
FROM selected_column_lineage c
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
""")
Set<ColumnLineageNodeData> getLineageRowsForDatasets(
@BindBeanList(
propertyNames = {"left", "right"},
value = "values")
List<Pair<String, String>> datasets);
}
27 changes: 15 additions & 12 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
@RegisterRowMapper(FieldDataMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
+ "SELECT 1 FROM dataset_fields AS df "
+ "INNER JOIN datasets_view AS d "
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
+ "WHERE df.name = :name)")
"""
SELECT EXISTS (
SELECT 1 FROM dataset_fields AS df
INNER JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
AND df.name = :name
)
""")
boolean exists(String namespaceName, String datasetName, String name);

default Dataset updateTags(
Expand Down Expand Up @@ -97,20 +100,20 @@ default Dataset updateTags(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
List<UUID> findDatasetFieldsUuids(String namespace, String datasetName);
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
INNER JOIN datasets_view AS d
ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace
WHERE df.name = :name
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
AND df.name = :name
""")
Optional<UUID> findUuid(String namespace, String datasetName, String name);
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);

@SqlQuery(
"SELECT f.*, "
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import marquez.db.ColumnLineageDao;
import marquez.db.DatasetFieldDao;
import marquez.db.models.ColumnLineageNodeData;
import marquez.service.models.ColumnLineage;
import marquez.service.models.ColumnLineageInputField;
import marquez.service.models.Dataset;
import marquez.service.models.Edge;
import marquez.service.models.Lineage;
import marquez.service.models.Node;
import marquez.service.models.NodeId;
import org.apache.commons.lang3.tuple.Pair;

@Slf4j
public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao {
Expand Down Expand Up @@ -124,4 +128,51 @@ List<UUID> getColumnNodeUuids(NodeId nodeId) {
}
return columnNodeUuids;
}

public void enrichWithColumnLineage(List<Dataset> datasets) {
if (datasets.isEmpty()) {
return;
}

Set<ColumnLineageNodeData> lineageRowsForDatasets =
getLineageRowsForDatasets(
datasets.stream()
.map(d -> Pair.of(d.getNamespace().getValue(), d.getName().getValue()))
.collect(Collectors.toList()));

Map<Dataset, List<ColumnLineage>> datasetLineage = new HashMap<>();
lineageRowsForDatasets.stream()
.forEach(
nodeData -> {
Dataset dataset =
datasets.stream()
.filter(d -> d.getNamespace().getValue().equals(nodeData.getNamespace()))
.filter(d -> d.getName().getValue().equals(nodeData.getDataset()))
.findAny()
.get();

if (!datasetLineage.containsKey(dataset)) {
datasetLineage.put(dataset, new LinkedList<>());
}
datasetLineage
.get(dataset)
.add(
ColumnLineage.builder()
.name(nodeData.getField())
.transformationDescription(nodeData.getTransformationDescription())
.transformationType(nodeData.getTransformationType())
.inputFields(
nodeData.getInputFields().stream()
.map(
f ->
new ColumnLineageInputField(
f.getNamespace(), f.getDataset(), f.getField()))
.collect(Collectors.toList()))
.build());
});

datasets.stream()
.filter(dataset -> datasetLineage.containsKey(dataset))
.forEach(dataset -> dataset.setColumnLineage(datasetLineage.get(dataset)));
}
}
24 changes: 24 additions & 0 deletions api/src/main/java/marquez/service/models/ColumnLineage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.util.List;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
@Builder
@Getter
public class ColumnLineage {
@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

@EqualsAndHashCode
@ToString
@Getter
@AllArgsConstructor
public class ColumnLineageInputField {
@NotNull private String namespace;
@NotNull private String dataset;
@NotNull private String field;
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/service/models/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public abstract class Dataset {
@Nullable private final String lastLifecycleState;
@Nullable private final String description;
@Nullable private final UUID currentVersion;
@Getter @Setter @Nullable private List<ColumnLineage> columnLineage;
@Getter ImmutableMap<String, Object> facets;
@Getter private final boolean isDeleted;

Expand All @@ -70,6 +71,7 @@ public Dataset(
@Nullable final String lastLifecycleState,
@Nullable final String description,
@Nullable final UUID currentVersion,
@Nullable final ImmutableList<ColumnLineage> columnLineage,
@Nullable final ImmutableMap<String, Object> facets,
boolean isDeleted) {
this.id = id;
Expand All @@ -86,6 +88,7 @@ public Dataset(
this.lastLifecycleState = lastLifecycleState;
this.description = description;
this.currentVersion = currentVersion;
this.columnLineage = columnLineage;
this.facets = (facets == null) ? ImmutableMap.of() : facets;
this.isDeleted = isDeleted;
}
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/models/DbTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public DbTable(
lastLifecycleState,
description,
currentVersion,
null,
facets,
isDeleted);
}
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/models/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public Stream(
lastLifecycleState,
description,
currentVersion,
null,
facets,
isDeleted);
this.schemaLocation = schemaLocation;
Expand Down
48 changes: 48 additions & 0 deletions api/src/test/java/marquez/DatasetIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package marquez;

import static marquez.db.ColumnLineageTestUtils.getDatasetA;
import static marquez.db.ColumnLineageTestUtils.getDatasetB;
import static org.assertj.core.api.Assertions.assertThat;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -22,6 +24,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import marquez.client.models.ColumnLineage;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetId;
import marquez.client.models.DatasetVersion;
Expand Down Expand Up @@ -440,4 +443,49 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep
datasets = client.listDatasets(namespace);
assertThat(datasets).hasSize(1);
}

@Test
public void testApp_getDatasetContainsColumnLineage() {
LineageEvent event =
new LineageEvent(
"COMPLETE",
Instant.now().atZone(ZoneId.systemDefault()),
new LineageEvent.Run(UUID.randomUUID().toString(), null),
new LineageEvent.Job("namespace", "job_name", null),
List.of(getDatasetA()),
List.of(getDatasetB()),
"the_producer");

CompletableFuture<Integer> resp =
this.sendLineage(Utils.toJson(event))
.thenApply(HttpResponse::statusCode)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});
resp.join();

// verify listDatasets contains column lineage
List<ColumnLineage> columnLineage;

columnLineage =
client.listDatasets("namespace").stream()
.filter(d -> d.getName().equals("dataset_b"))
.findAny()
.get()
.getColumnLineage();
assertThat(columnLineage).hasSize(1);
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);

// verify getDataset returns non-empty column lineage
columnLineage = client.getDataset("namespace", "dataset_b").getColumnLineage();
assertThat(columnLineage).hasSize(1);
assertThat(columnLineage.get(0).getInputFields()).hasSize(2);

client.deleteJob("namespace", "job_name");
client.deleteDataset("namespace", "dataset_a");
client.deleteDataset("namespace", "dataset_b");
}
}
3 changes: 3 additions & 0 deletions api/src/test/java/marquez/db/ColumnLineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public static LineageEvent.Dataset getDatasetC() {
"namespace",
"dataset_c",
LineageEvent.DatasetFacets.builder()
.dataSource(
new LineageEvent.DatasourceDatasetFacet(
PRODUCER_URL, SCHEMA_URL, "the source", "http://thesource.com"))
.schema(
new LineageEvent.SchemaDatasetFacet(
PRODUCER_URL,
Expand Down
Loading

0 comments on commit 950e293

Please sign in to comment.