From 39b3add4e470d5b8f81b766e96b9515c05595cfa Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Thu, 15 Sep 2022 12:50:41 +0200 Subject: [PATCH] add column lineage graph endpoint Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 1 + api/src/main/java/marquez/MarquezContext.java | 7 + .../main/java/marquez/api/BaseResource.java | 3 + .../marquez/api/ColumnLineageResource.java | 50 +++ .../marquez/common/models/DatasetFieldId.java | 27 ++ .../java/marquez/db/ColumnLineageDao.java | 62 +++ .../main/java/marquez/db/DatasetFieldDao.java | 19 + .../mappers/ColumnLineageNodeDataMapper.java | 54 +++ .../db/models/ColumnLineageNodeData.java | 23 ++ .../marquez/db/models/InputFieldNodeData.java | 18 + .../main/java/marquez/db/models/NodeData.java | 3 +- .../marquez/service/ColumnLineageService.java | 127 ++++++ .../java/marquez/service/DelegatingDaos.java | 6 + .../java/marquez/service/ServiceFactory.java | 1 + .../java/marquez/service/models/Node.java | 4 + .../java/marquez/service/models/NodeId.java | 37 +- .../java/marquez/service/models/NodeType.java | 4 +- .../migration/V50__index_dataset_fields.sql | 2 + .../test/java/marquez/api/ApiTestUtils.java | 4 + .../api/ColumnLineageResourceTest.java | 78 ++++ .../java/marquez/db/ColumnLineageDaoTest.java | 363 +++++++++++++++++- .../marquez/db/ColumnLineageTestUtils.java | 114 ++++++ .../service/ColumnLineageServiceTest.java | 227 +++++++++++ .../marquez/service/models/NodeIdTest.java | 30 ++ .../test/resources/column_lineage/node.json | 28 ++ 25 files changed, 1276 insertions(+), 16 deletions(-) create mode 100644 api/src/main/java/marquez/api/ColumnLineageResource.java create mode 100644 api/src/main/java/marquez/common/models/DatasetFieldId.java create mode 100644 api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java create mode 100644 api/src/main/java/marquez/db/models/ColumnLineageNodeData.java create mode 100644 api/src/main/java/marquez/db/models/InputFieldNodeData.java create mode 100644 api/src/main/java/marquez/service/ColumnLineageService.java create mode 100644 api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql create mode 100644 api/src/test/java/marquez/api/ColumnLineageResourceTest.java create mode 100644 api/src/test/java/marquez/db/ColumnLineageTestUtils.java create mode 100644 api/src/test/java/marquez/service/ColumnLineageServiceTest.java create mode 100644 api/src/test/resources/column_lineage/node.json diff --git a/CHANGELOG.md b/CHANGELOG.md index ecdec36bb5..6b075dbd89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added * Implemented dataset symlink feature which allows providing multiple names for a dataset and adds edges to lineage graph based on symlinks [`#2066`](https://github.com/MarquezProject/marquez/pull/2066) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index b982bca539..2b2f69bac1 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -22,6 +22,7 @@ import marquez.api.TagResource; import marquez.api.exceptions.JdbiExceptionExceptionMapper; import marquez.db.BaseDao; +import marquez.db.ColumnLineageDao; import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; @@ -39,6 +40,7 @@ import marquez.db.TagDao; import marquez.graphql.GraphqlSchemaBuilder; import marquez.graphql.MarquezGraphqlServletBuilder; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -70,6 +72,7 @@ public final class MarquezContext { @Getter private final TagDao tagDao; @Getter private final OpenLineageDao openLineageDao; @Getter private final LineageDao lineageDao; + @Getter private final ColumnLineageDao columnLineageDao; @Getter private final SearchDao searchDao; @Getter private final List runTransitionListeners; @@ -81,6 +84,7 @@ public final class MarquezContext { @Getter private final RunService runService; @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; + @Getter private final ColumnLineageService columnLineageService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @Getter private final DatasetResource datasetResource; @@ -115,6 +119,7 @@ private MarquezContext( this.tagDao = jdbi.onDemand(TagDao.class); this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); this.lineageDao = jdbi.onDemand(LineageDao.class); + this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class); this.searchDao = jdbi.onDemand(SearchDao.class); this.runTransitionListeners = runTransitionListeners; @@ -128,6 +133,7 @@ private MarquezContext( this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); + this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() @@ -139,6 +145,7 @@ private MarquezContext( .openLineageService(openLineageService) .sourceService(sourceService) .lineageService(lineageService) + .columnLineageService(columnLineageService) .datasetFieldService(new DatasetFieldService(baseDao)) .datasetVersionService(new DatasetVersionService(baseDao)) .build(); diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index ce15d31ab3..7b116ab596 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -25,6 +25,7 @@ import marquez.common.models.NamespaceName; import marquez.common.models.RunId; import marquez.common.models.SourceName; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -50,6 +51,7 @@ public class BaseResource { protected DatasetVersionService datasetVersionService; protected DatasetFieldService datasetFieldService; protected LineageService lineageService; + protected ColumnLineageService columnLineageService; public BaseResource(ServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) { this.datasetVersionService = serviceFactory.getDatasetVersionService(); this.datasetFieldService = serviceFactory.getDatasetFieldService(); this.lineageService = serviceFactory.getLineageService(); + this.columnLineageService = serviceFactory.getColumnLineageService(); } void throwIfNotExists(@NonNull NamespaceName namespaceName) { diff --git a/api/src/main/java/marquez/api/ColumnLineageResource.java b/api/src/main/java/marquez/api/ColumnLineageResource.java new file mode 100644 index 0000000000..057e8b35ee --- /dev/null +++ b/api/src/main/java/marquez/api/ColumnLineageResource.java @@ -0,0 +1,50 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api; + +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +import com.codahale.metrics.annotation.ExceptionMetered; +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import java.time.Instant; +import java.util.concurrent.ExecutionException; +import javax.validation.constraints.NotNull; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.service.ServiceFactory; +import marquez.service.models.NodeId; + +@Slf4j +@Path("/api/v1/column-lineage") +public class ColumnLineageResource extends BaseResource { + + private static final String DEFAULT_DEPTH = "20"; + + public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) { + super(serviceFactory); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON) + public Response getLineage( + @QueryParam("nodeId") @NotNull NodeId nodeId, + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) + throws ExecutionException, InterruptedException { + return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build(); + } +} diff --git a/api/src/main/java/marquez/common/models/DatasetFieldId.java b/api/src/main/java/marquez/common/models/DatasetFieldId.java new file mode 100644 index 0000000000..89aa505019 --- /dev/null +++ b/api/src/main/java/marquez/common/models/DatasetFieldId.java @@ -0,0 +1,27 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.common.models; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; + +/** ID for {@code DatasetField}. */ +@EqualsAndHashCode +@AllArgsConstructor +@ToString +public class DatasetFieldId { + + @Getter private final DatasetId datasetId; + @Getter private final FieldName fieldName; + + public static DatasetFieldId of(String namespace, String datasetName, String field) { + return new DatasetFieldId( + new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)), + FieldName.of(field)); + } +} diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index 3bc9410a2d..c472937cd1 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -5,20 +5,27 @@ package marquez.db; +import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING; + import java.time.Instant; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import marquez.db.mappers.ColumnLineageNodeDataMapper; import marquez.db.mappers.ColumnLineageRowMapper; +import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.ColumnLineageRow; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.customizer.BindBeanList; +import org.jdbi.v3.sqlobject.customizer.BindList; import org.jdbi.v3.sqlobject.statement.SqlQuery; import org.jdbi.v3.sqlobject.statement.SqlUpdate; @RegisterRowMapper(ColumnLineageRowMapper.class) +@RegisterRowMapper(ColumnLineageNodeDataMapper.class) public interface ColumnLineageDao extends BaseDao { default List upsertColumnLineageRow( @@ -88,4 +95,59 @@ void doUpsertColumnLineageRow( }, value = "values") List rows); + + @SqlQuery( + """ + WITH RECURSIVE + dataset_fields_view AS ( + SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid + FROM dataset_fields df + INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid + ), + column_lineage_recursive AS ( + SELECT *, 0 as depth + FROM column_lineage + WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil + UNION + SELECT + upstream_node.output_dataset_version_uuid, + upstream_node.output_dataset_field_uuid, + upstream_node.input_dataset_version_uuid, + upstream_node.input_dataset_field_uuid, + upstream_node.transformation_description, + upstream_node.transformation_type, + upstream_node.created_at, + upstream_node.updated_at, + node.depth + 1 as depth + FROM column_lineage upstream_node, column_lineage_recursive node + WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid + AND node.depth < :depth + ) + SELECT + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + FROM column_lineage_recursive clr + INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered + LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid + GROUP BY + output_fields.namespace_name, + output_fields.dataset_name, + output_fields.field_name, + output_fields.type, + clr.transformation_description, + clr.transformation_type, + clr.created_at, + clr.updated_at + """) + Set getLineage( + int depth, + @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, + Instant createdAtUntil); } diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index 99a2630016..a54a271604 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -93,6 +93,25 @@ default Dataset updateTags( + "WHERE dataset_uuid = :datasetUuid AND name = :name") Optional findUuid(UUID datasetUuid, String name); + @SqlQuery( + """ + SELECT df.uuid + FROM dataset_fields df + INNER JOIN datasets_view AS d + ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace + """) + List findDatasetFieldsUuids(String namespace, String datasetName); + + @SqlQuery( + """ + SELECT df.uuid + FROM dataset_fields df + INNER JOIN datasets_view AS d + ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespace + WHERE df.name = :name + """) + Optional findUuid(String namespace, String datasetName, String name); + @SqlQuery( "SELECT f.*, " + "ARRAY(SELECT t.name " diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java new file mode 100644 index 0000000000..6edb1f9f41 --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java @@ -0,0 +1,54 @@ +package marquez.db.mappers; + +import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; +import static marquez.db.Columns.TRANSFORMATION_TYPE; +import static marquez.db.Columns.stringOrThrow; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.db.Columns; +import marquez.db.models.ColumnLineageNodeData; +import marquez.db.models.InputFieldNodeData; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; +import org.postgresql.jdbc.PgArray; + +@Slf4j +public class ColumnLineageNodeDataMapper implements RowMapper { + + private static final ObjectMapper MAPPER = Utils.getMapper(); + + @Override + public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException { + return new ColumnLineageNodeData( + stringOrThrow(results, Columns.NAMESPACE_NAME), + stringOrThrow(results, Columns.DATASET_NAME), + stringOrThrow(results, Columns.FIELD_NAME), + stringOrThrow(results, Columns.TYPE), + stringOrThrow(results, TRANSFORMATION_DESCRIPTION), + stringOrThrow(results, TRANSFORMATION_TYPE), + toInputFields(results, "inputFields")); + } + + public static ImmutableList toInputFields(ResultSet results, String column) + throws SQLException { + if (results.getObject(column) == null) { + return ImmutableList.of(); + } + + PgArray pgArray = (PgArray) results.getObject(column); + Object[] deserializedArray = (Object[]) pgArray.getArray(); + + return ImmutableList.copyOf( + Arrays.asList(deserializedArray).stream() + .map(o -> (String[]) o) + .map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2])) + .collect(Collectors.toList())); + } +} diff --git a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java new file mode 100644 index 0000000000..dd33156e62 --- /dev/null +++ b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java @@ -0,0 +1,23 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +public class ColumnLineageNodeData implements NodeData { + @NonNull String namespace; + @NonNull String name; + @NonNull String field; + @NonNull String dataType; + @NonNull String transformationDescription; + @NonNull String transformationType; + @NonNull List inputFields; +} diff --git a/api/src/main/java/marquez/db/models/InputFieldNodeData.java b/api/src/main/java/marquez/db/models/InputFieldNodeData.java new file mode 100644 index 0000000000..f6f728a986 --- /dev/null +++ b/api/src/main/java/marquez/db/models/InputFieldNodeData.java @@ -0,0 +1,18 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +public class InputFieldNodeData { + @NonNull String namespace; + @NonNull String name; + @NonNull String field; +} diff --git a/api/src/main/java/marquez/db/models/NodeData.java b/api/src/main/java/marquez/db/models/NodeData.java index c2e7037b37..3a7bcc6dfc 100644 --- a/api/src/main/java/marquez/db/models/NodeData.java +++ b/api/src/main/java/marquez/db/models/NodeData.java @@ -14,6 +14,7 @@ property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = DatasetData.class, name = "DATASET"), - @JsonSubTypes.Type(value = JobData.class, name = "JOB") + @JsonSubTypes.Type(value = JobData.class, name = "JOB"), + @JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD") }) public interface NodeData {} diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java new file mode 100644 index 0000000000..32871c33ee --- /dev/null +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -0,0 +1,127 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import com.google.common.collect.ImmutableSortedSet; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetId; +import marquez.db.ColumnLineageDao; +import marquez.db.DatasetFieldDao; +import marquez.db.models.ColumnLineageNodeData; +import marquez.service.models.Edge; +import marquez.service.models.Lineage; +import marquez.service.models.Node; +import marquez.service.models.NodeId; + +@Slf4j +public class ColumnLineageService extends DelegatingDaos.DelegatingColumnLineageDao { + private final DatasetFieldDao datasetFieldDao; + + public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDao) { + super(dao); + this.datasetFieldDao = datasetFieldDao; + } + + public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) { + List columnNodeUuids = getColumnNodeUuids(nodeId); + + if (columnNodeUuids.isEmpty()) { + throw new NodeIdNotFoundException("Could not find node"); + } + + return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil)); + } + + private Lineage toLineage(Set lineageNodeData) { + Map graphNodes = new HashMap<>(); + Map> inEdges = new HashMap<>(); + Map> outEdges = new HashMap<>(); + + // create nodes + lineageNodeData.stream() + .forEach( + columnLineageNodeData -> { + NodeId nodeId = + NodeId.of( + DatasetFieldId.of( + columnLineageNodeData.getNamespace(), + columnLineageNodeData.getName(), + columnLineageNodeData.getField())); + graphNodes.put(nodeId, Node.datasetField().data(columnLineageNodeData).id(nodeId)); + columnLineageNodeData.getInputFields().stream() + .map( + i -> + NodeId.of(DatasetFieldId.of(i.getNamespace(), i.getName(), i.getField()))) + .forEach( + inputNodeId -> { + graphNodes.put(inputNodeId, Node.datasetField().id(inputNodeId)); + Optional.ofNullable(outEdges.get(inputNodeId)) + .ifPresentOrElse( + nodeEdges -> nodeEdges.add(nodeId), + () -> outEdges.put(inputNodeId, new LinkedList<>(List.of(nodeId)))); + Optional.ofNullable(inEdges.get(nodeId)) + .ifPresentOrElse( + nodeEdges -> nodeEdges.add(inputNodeId), + () -> inEdges.put(nodeId, new LinkedList<>(List.of(inputNodeId)))); + }); + }); + + // add edges between the nodes + inEdges.forEach( + (nodeId, nodes) -> { + graphNodes + .get(nodeId) + .inEdges( + nodes.stream() + .map(toNodeId -> new Edge(nodeId, toNodeId)) + .collect(Collectors.toSet())); + }); + outEdges.forEach( + (nodeId, nodes) -> { + graphNodes + .get(nodeId) + .outEdges( + nodes.stream() + .map(toNodeId -> new Edge(nodeId, toNodeId)) + .collect(Collectors.toSet())); + }); + + // build nodes and return as lineage + return new Lineage( + ImmutableSortedSet.copyOf( + graphNodes.values().stream().map(Node.Builder::build).collect(Collectors.toSet()))); + } + + List getColumnNodeUuids(NodeId nodeId) { + List columnNodeUuids = new ArrayList<>(); + if (nodeId.isDatasetType()) { + DatasetId datasetId = nodeId.asDatasetId(); + columnNodeUuids.addAll( + datasetFieldDao.findDatasetFieldsUuids( + datasetId.getNamespace().getValue(), datasetId.getName().getValue())); + } else if (nodeId.isDatasetFieldType()) { + DatasetFieldId datasetFieldId = nodeId.asDatasetFieldId(); + datasetFieldDao + .findUuid( + datasetFieldId.getDatasetId().getNamespace().getValue(), + datasetFieldId.getDatasetId().getName().getValue(), + datasetFieldId.getFieldName().getValue()) + .ifPresent(uuid -> columnNodeUuids.add(uuid)); + } + return columnNodeUuids; + } +} diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index 8184200ab4..5b980156af 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -7,6 +7,7 @@ import lombok.RequiredArgsConstructor; import lombok.experimental.Delegate; +import marquez.db.ColumnLineageDao; import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; @@ -98,4 +99,9 @@ public static class DelegatingTagDao implements TagDao { public static class DelegatingLineageDao implements LineageDao { @Delegate private final LineageDao delegate; } + + @RequiredArgsConstructor + public static class DelegatingColumnLineageDao implements ColumnLineageDao { + @Delegate private final ColumnLineageDao delegate; + } } diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 5a4b51465b..8b5365dadb 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -22,4 +22,5 @@ public class ServiceFactory { @NonNull DatasetVersionService datasetVersionService; @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; + @NonNull ColumnLineageService columnLineageService; } diff --git a/api/src/main/java/marquez/service/models/Node.java b/api/src/main/java/marquez/service/models/Node.java index 7cddf633ea..e526108b5b 100644 --- a/api/src/main/java/marquez/service/models/Node.java +++ b/api/src/main/java/marquez/service/models/Node.java @@ -52,6 +52,10 @@ public static Builder dataset() { return new Builder(NodeType.DATASET); } + public static Builder datasetField() { + return new Builder(NodeType.DATASET_FIELD); + } + public static Builder job() { return new Builder(NodeType.JOB); } diff --git a/api/src/main/java/marquez/service/models/NodeId.java b/api/src/main/java/marquez/service/models/NodeId.java index de8c82f8e7..04cbf2b6ce 100644 --- a/api/src/main/java/marquez/service/models/NodeId.java +++ b/api/src/main/java/marquez/service/models/NodeId.java @@ -20,9 +20,11 @@ import lombok.Getter; import lombok.NonNull; import lombok.ToString; +import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.FieldName; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.JobVersionId; @@ -40,11 +42,14 @@ public final class NodeId implements Comparable { public static final Joiner ID_JOINER = Joiner.on(ID_DELIM); private static final String ID_PREFX_DATASET = "dataset"; + private static final String ID_PREFX_DATASET_FIELD = "datasetField"; private static final String ID_PREFX_JOB = "job"; private static final String ID_PREFX_RUN = "run"; private static final Pattern ID_PATTERN = Pattern.compile( - String.format("^(%s|%s|%s):.*$", ID_PREFX_DATASET, ID_PREFX_JOB, ID_PREFX_RUN)); + String.format( + "^(%s|%s|%s|%s):.*$", + ID_PREFX_DATASET, ID_PREFX_DATASET_FIELD, ID_PREFX_JOB, ID_PREFX_RUN)); public static final String VERSION_DELIM = "#"; @@ -53,9 +58,10 @@ public final class NodeId implements Comparable { public NodeId(final String value) { checkArgument( ID_PATTERN.matcher(value).matches(), - "node ID (%s) must start with '%s', '%s', or '%s'", + "node ID (%s) must start with '%s', '%s', '%s' or '%s'", value, ID_PREFX_DATASET, + ID_PREFX_DATASET_FIELD, ID_PREFX_JOB, ID_PREFX_RUN); this.value = value; @@ -110,6 +116,15 @@ public static NodeId of(@NonNull DatasetId datasetId) { return NodeId.of(datasetId.getNamespace(), datasetId.getName()); } + public static NodeId of(@NonNull DatasetFieldId datasetFieldIdId) { + return of( + ID_JOINER.join( + ID_PREFX_DATASET_FIELD, + datasetFieldIdId.getDatasetId().getNamespace().getValue(), + datasetFieldIdId.getDatasetId().getName().getValue(), + datasetFieldIdId.getFieldName().getValue())); + } + public static NodeId of(@NonNull JobId jobId) { return NodeId.of(jobId.getNamespace(), jobId.getName()); } @@ -133,7 +148,12 @@ private static String appendVersionTo(@NonNull final String value, @Nullable fin @JsonIgnore public boolean isDatasetType() { - return value.startsWith(ID_PREFX_DATASET); + return value.startsWith(ID_PREFX_DATASET + ID_DELIM); + } + + @JsonIgnore + public boolean isDatasetFieldType() { + return value.startsWith(ID_PREFX_DATASET_FIELD); } @JsonIgnore @@ -167,7 +187,8 @@ public boolean sameTypeAs(@NonNull NodeId o) { || (this.isDatasetVersionType() && o.isDatasetVersionType()) || (this.isJobType() && o.isJobType()) || (this.isJobVersionType() && o.isJobVersionType()) - || (this.isRunType() && o.isRunType()); + || (this.isRunType() && o.isRunType()) + || (this.isDatasetFieldType() && o.isDatasetFieldType()); } @JsonIgnore @@ -220,6 +241,14 @@ public DatasetId asDatasetId() { return new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])); } + @JsonIgnore + public DatasetFieldId asDatasetFieldId() { + String[] parts = parts(4, ID_PREFX_DATASET); + return new DatasetFieldId( + new DatasetId(NamespaceName.of(parts[1]), DatasetName.of(parts[2])), + FieldName.of(parts[3])); + } + @JsonIgnore public JobVersionId asJobVersionId() { String[] parts = parts(3, ID_PREFX_JOB); diff --git a/api/src/main/java/marquez/service/models/NodeType.java b/api/src/main/java/marquez/service/models/NodeType.java index 4951fb5371..5299609f79 100644 --- a/api/src/main/java/marquez/service/models/NodeType.java +++ b/api/src/main/java/marquez/service/models/NodeType.java @@ -7,6 +7,8 @@ public enum NodeType { DATASET, + DATASET_FIELD, JOB, - RUN; + RUN, + FIELD; } diff --git a/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql b/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql new file mode 100644 index 0000000000..d753a136f8 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V50__index_dataset_fields.sql @@ -0,0 +1,2 @@ +create index dataset_fields_dataset_fields + on dataset_fields (dataset_uuid); \ No newline at end of file diff --git a/api/src/test/java/marquez/api/ApiTestUtils.java b/api/src/test/java/marquez/api/ApiTestUtils.java index f9e181cf79..96c09f0e7b 100644 --- a/api/src/test/java/marquez/api/ApiTestUtils.java +++ b/api/src/test/java/marquez/api/ApiTestUtils.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.mock; import java.util.Map; +import marquez.service.ColumnLineageService; import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; @@ -33,6 +34,9 @@ public static ServiceFactory mockServiceFactory(Map mocks) { return ServiceFactory.builder() .lineageService( (LineageService) mocks.getOrDefault(LineageService.class, (mock(LineageService.class)))) + .columnLineageService( + (ColumnLineageService) + mocks.getOrDefault(ColumnLineageService.class, (mock(ColumnLineageService.class)))) .openLineageService( (OpenLineageService) mocks.getOrDefault(OpenLineageService.class, (mock(OpenLineageService.class)))) diff --git a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java new file mode 100644 index 0000000000..3ffafcd7f5 --- /dev/null +++ b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.ImmutableSortedSet; +import io.dropwizard.testing.junit5.DropwizardExtensionsSupport; +import io.dropwizard.testing.junit5.ResourceExtension; +import java.time.Instant; +import java.util.Map; +import marquez.common.Utils; +import marquez.service.ColumnLineageService; +import marquez.service.ServiceFactory; +import marquez.service.models.Lineage; +import marquez.service.models.Node; +import marquez.service.models.NodeId; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(DropwizardExtensionsSupport.class) +public class ColumnLineageResourceTest { + + private static ResourceExtension UNDER_TEST; + private static Lineage LINEAGE; + + static { + ColumnLineageService lineageService = mock(ColumnLineageService.class); + + Node testNode = + Utils.fromJson( + ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"), + new TypeReference<>() {}); + LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); + when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class))) + .thenReturn(LINEAGE); + + ServiceFactory serviceFactory = + ApiTestUtils.mockServiceFactory(Map.of(ColumnLineageService.class, lineageService)); + + UNDER_TEST = + ResourceExtension.builder().addResource(new ColumnLineageResource(serviceFactory)).build(); + } + + @Test + public void testGetColumnLineageByDatasetField() { + final Lineage lineage = + UNDER_TEST + .target("/api/v1/column-lineage") + .queryParam("nodeId", "dataset:namespace:commonDataset:col_a") + .request() + .get() + .readEntity(Lineage.class); + + assertEquals(lineage, LINEAGE); + } + + @Test + public void testGetColumnLineageByDataset() { + final Lineage lineage = + UNDER_TEST + .target("/api/v1/column-lineage") + .queryParam("nodeId", "dataset:namespace:commonDataset") + .request() + .get() + .readEntity(Lineage.class); + + assertEquals(lineage, LINEAGE); + } +} diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index 6a8e7e2a1c..987651b024 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -5,7 +5,13 @@ package marquez.db; +import static marquez.db.ColumnLineageTestUtils.getDatasetA; +import static marquez.db.ColumnLineageTestUtils.getDatasetB; +import static marquez.db.ColumnLineageTestUtils.getDatasetC; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static marquez.db.OpenLineageDao.DEFAULT_NAMESPACE_OWNER; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -13,14 +19,20 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import marquez.common.models.DatasetType; +import marquez.db.models.ColumnLineageNodeData; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetRow; import marquez.db.models.DatasetVersionRow; import marquez.db.models.NamespaceRow; import marquez.db.models.SourceRow; +import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; import org.apache.commons.lang3.tuple.Pair; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; @@ -32,6 +44,7 @@ @ExtendWith(MarquezJdbiExternalPostgresExtension.class) public class ColumnLineageDaoTest { + private static OpenLineageDao openLineageDao; private static ColumnLineageDao dao; private static DatasetFieldDao fieldDao; private static DatasetDao datasetDao; @@ -47,9 +60,11 @@ public class ColumnLineageDaoTest { private DatasetRow outputDatasetRow; private DatasetVersionRow inputDatasetVersionRow; private DatasetVersionRow outputDatasetVersionRow; + private LineageEvent.JobFacet jobFacet; @BeforeAll public static void setUpOnce(Jdbi jdbi) { + openLineageDao = jdbi.onDemand(OpenLineageDao.class); dao = jdbi.onDemand(ColumnLineageDao.class); fieldDao = jdbi.onDemand(DatasetFieldDao.class); datasetDao = jdbi.onDemand(DatasetDao.class); @@ -129,20 +144,13 @@ public void setup() { // insert output dataset field fieldDao.upsert( outputDatasetFieldUuid, now, "output-field", "string", "desc", outputDatasetRow.getUuid()); + + jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); } @AfterEach public void tearDown(Jdbi jdbi) { - jdbi.inTransaction( - handle -> { - handle.execute("DELETE FROM column_lineage"); - handle.execute("DELETE FROM dataset_versions"); - handle.execute("DELETE FROM dataset_fields"); - handle.execute("DELETE FROM datasets"); - handle.execute("DELETE FROM sources"); - handle.execute("DELETE FROM namespaces"); - return null; - }); + ColumnLineageTestUtils.tearDown(jdbi); } @Test @@ -219,4 +227,339 @@ void testUpsertOnUpdatePreventsDuplicates() { assertEquals( now.plusSeconds(1000).getEpochSecond(), rows.get(0).getUpdatedAt().getEpochSecond()); } + + // dataset_A (col_a, col_b) + // dataset_B (col_c) depends on (col_a, col_b) + // dataset_C (col_d) depends on col_c + @Test + void testGetLineage() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0); + UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); + Set lineage = + dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()); + + assertEquals(2, lineage.size()); + + ColumnLineageNodeData dataset_b = + lineage.stream().filter(cd -> cd.getName().equals("dataset_b")).findAny().get(); + ColumnLineageNodeData dataset_c = + lineage.stream().filter(cd -> cd.getName().equals("dataset_c")).findAny().get(); + + // test dataset_c + assertThat(dataset_c.getInputFields()).hasSize(1); + assertEquals("col_d", dataset_c.getField()); + assertEquals("namespace", dataset_c.getInputFields().get(0).getNamespace()); + assertEquals("dataset_b", dataset_c.getInputFields().get(0).getName()); + assertEquals("col_c", dataset_c.getInputFields().get(0).getField()); + assertEquals("type2", dataset_c.getTransformationType()); + assertEquals("description2", dataset_c.getTransformationDescription()); + + // test dataset_b + assertThat(dataset_b.getInputFields()).hasSize(2); + assertEquals("col_c", dataset_b.getField()); + assertEquals( + "col_b", + dataset_b.getInputFields().stream() + .filter(f -> f.getField().equals("col_b")) + .findAny() + .get() + .getField()); + assertEquals( + "col_a", + dataset_b.getInputFields().stream() + .filter(f -> f.getField().equals("col_a")) + .findAny() + .get() + .getField()); + + assertEquals("namespace", dataset_b.getInputFields().get(0).getNamespace()); + assertEquals("dataset_a", dataset_b.getInputFields().get(0).getName()); + assertEquals("type1", dataset_b.getTransformationType()); + assertEquals("description1", dataset_b.getTransformationDescription()); + } + + @Test + void testGetLineageWhenNoLineageForColumn() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getInputs().get().get(0); + UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); + + // assert lineage is empty + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty(); + } + + /** + * Create dataset_d build on the topi of dataset_c. Lineage of depth 1 of dataset_d should be of + * size 2 (instead of 3) + */ + @Test + void testGetLineageWithLimitedDepth() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + Dataset dataset_D = + new Dataset( + "namespace", + "dataset_d", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_e", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.ColumnLineageOutputColumn( + "col_e", + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "", + "")))) + .build()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_D)); + + UpdateLineageRow.DatasetRecord datasetRecord_d = lineageRow.getOutputs().get().get(0); + UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get(); + + // make sure dataset are constructed properly + assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now())) + .hasSize(3); + + // verify graph size is 2 when max depth is 1 + assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2); + } + + @Test + void testGetLineageWhenCycleExists() { + Dataset dataset_A = + new Dataset( + "namespace", + "dataset_a", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.SchemaField("col_a", "STRING", ""), + new LineageEvent.SchemaField("col_b", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.ColumnLineageOutputColumn( + "col_a", + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "description3", + "type3")))) + .build()); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job3", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_A)); + + UpdateLineageRow.DatasetRecord datasetRecord_a = lineageRow.getOutputs().get().get(0); + UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getInputs().get().get(0); + + UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); + UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); + + // column lineages for col_a and col_e should be of size 3 + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())) + .hasSize(3); + assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now())) + .hasSize(3); + } + + /** + * Run two jobs that write to dataset_b using dataset_a and dataset_c. Both input fields should be + * returned + */ + @Test + void testGetLineageWhenTwoJobsWriteToSameDataset() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + Dataset dataset_C = getDatasetC(); + Dataset dataset_B_another_job = + new Dataset( + "namespace", + "dataset_b", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.ColumnLineageOutputColumn( + "col_c", + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_c", "col_d")), + "description1", + "type1")))) + .build()); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_C), + Arrays.asList(dataset_B_another_job)); + + UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); + UUID field_col_c = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + + // assert input fields for col_d contain col_a and col_c + List inputFields = + dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream() + .filter(node -> node.getName().equals("dataset_b")) + .flatMap(node -> node.getInputFields().stream()) + .map(input -> input.getField()) + .collect(Collectors.toList()); + + assertThat(inputFields).hasSize(3).contains("col_a", "col_b", "col_d"); + } + + @Test + void testGetLineagePointInTime() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); + UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + Instant columnLineageCreatedAt = + dao.findColumnLineageByDatasetVersionColumnAndOutputDatasetField( + datasetRecord_b.getDatasetVersionRow().getUuid(), field_col_b) + .get(0) + .getCreatedAt(); + + // assert lineage is empty before and present after + assertThat( + dao.getLineage( + 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1))) + .isEmpty(); + assertThat( + dao.getLineage( + 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1))) + .hasSize(1); + } } diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java new file mode 100644 index 0000000000..41d50c36eb --- /dev/null +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -0,0 +1,114 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db; + +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; + +import java.util.Arrays; +import marquez.service.models.LineageEvent; +import org.jdbi.v3.core.Jdbi; + +public class ColumnLineageTestUtils { + + public static void tearDown(Jdbi jdbi) { + jdbi.inTransaction( + handle -> { + handle.execute("DELETE FROM column_lineage"); + handle.execute("DELETE FROM lineage_events"); + handle.execute("DELETE FROM runs_input_mapping"); + handle.execute("DELETE FROM datasets_tag_mapping"); + handle.execute("DELETE FROM dataset_versions_field_mapping"); + handle.execute("DELETE FROM dataset_versions"); + handle.execute("UPDATE runs SET start_run_state_uuid=NULL, end_run_state_uuid=NULL"); + handle.execute("DELETE FROM run_states"); + handle.execute("DELETE FROM runs"); + handle.execute("DELETE FROM run_args"); + handle.execute("DELETE FROM job_versions_io_mapping"); + handle.execute("DELETE FROM job_versions"); + handle.execute("DELETE FROM jobs"); + handle.execute("DELETE FROM dataset_fields_tag_mapping"); + handle.execute("DELETE FROM dataset_fields"); + handle.execute("DELETE FROM datasets"); + handle.execute("DELETE FROM sources"); + handle.execute("DELETE FROM dataset_symlinks"); + handle.execute("DELETE FROM namespaces"); + return null; + }); + } + + // dataset_A (col_a, col_b) + // dataset_B (col_c) depends on (col_a, col_b) + // dataset_C (col_d) depends on col_c + public static LineageEvent.Dataset getDatasetA() { + return new LineageEvent.Dataset( + "namespace", + "dataset_a", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.SchemaField("col_a", "STRING", ""), + new LineageEvent.SchemaField("col_b", "STRING", "")))) + .build()); + } + + // dataset_B (col_c) depends on (col_a, col_b) + public static LineageEvent.Dataset getDatasetB() { + return new LineageEvent.Dataset( + "namespace", + "dataset_b", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.ColumnLineageOutputColumn( + "col_c", + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_a", "col_a"), + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_a", "col_b")), + "description1", + "type1")))) + .build()); + } + + // dataset_C (col_d) depends on col_c + public static LineageEvent.Dataset getDatasetC() { + return new LineageEvent.Dataset( + "namespace", + "dataset_c", + LineageEvent.DatasetFacets.builder() + .schema( + new LineageEvent.SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new LineageEvent.SchemaField("col_d", "STRING", "")))) + .columnLineage( + new LineageEvent.ColumnLineageFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList( + new LineageEvent.ColumnLineageOutputColumn( + "col_d", + Arrays.asList( + new LineageEvent.ColumnLineageInputField( + "namespace", "dataset_b", "col_c")), + "description2", + "type2")))) + .build()); + } +} diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java new file mode 100644 index 0000000000..eef37d084f --- /dev/null +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import static marquez.db.ColumnLineageTestUtils.getDatasetA; +import static marquez.db.ColumnLineageTestUtils.getDatasetB; +import static marquez.db.ColumnLineageTestUtils.getDatasetC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import marquez.common.models.DatasetFieldId; +import marquez.common.models.DatasetId; +import marquez.common.models.DatasetName; +import marquez.common.models.NamespaceName; +import marquez.db.ColumnLineageDao; +import marquez.db.ColumnLineageTestUtils; +import marquez.db.DatasetFieldDao; +import marquez.db.LineageTestUtils; +import marquez.db.OpenLineageDao; +import marquez.db.models.ColumnLineageNodeData; +import marquez.db.models.InputFieldNodeData; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.Lineage; +import marquez.service.models.LineageEvent; +import marquez.service.models.Node; +import marquez.service.models.NodeId; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +public class ColumnLineageServiceTest { + + private static ColumnLineageDao dao; + private static OpenLineageDao openLineageDao; + private static DatasetFieldDao fieldDao; + private static ColumnLineageService lineageService; + private static LineageEvent.JobFacet jobFacet; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + dao = jdbi.onDemand(ColumnLineageDao.class); + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + fieldDao = jdbi.onDemand(DatasetFieldDao.class); + lineageService = new ColumnLineageService(dao, fieldDao); + jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + } + + @AfterEach + public void tearDown(Jdbi jdbi) { + ColumnLineageTestUtils.tearDown(jdbi); + } + + @Test + public void testLineageByDatasetFieldId() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineage = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + + assertThat(lineage.getGraph()).hasSize(3); + + // check dataset_B node + Node col_c = getNode(lineage, "dataset_b", "col_c").get(); + List inputFields = + ((ColumnLineageNodeData) col_c.getData()).getInputFields(); + assertEquals( + "description1", ((ColumnLineageNodeData) col_c.getData()).getTransformationDescription()); + assertEquals("type1", ((ColumnLineageNodeData) col_c.getData()).getTransformationType()); + assertEquals("STRING", ((ColumnLineageNodeData) col_c.getData()).getDataType()); + assertThat(inputFields).hasSize(2); + assertEquals("dataset_a", inputFields.get(0).getName()); + + // check dataset_A node + Node col_a = getNode(lineage, "dataset_a", "col_b").get(); + assertNull((ColumnLineageNodeData) col_a.getData()); + + // verify edges + // assert dataset_B (col_c) -> dataset_A (col_a) + assertThat(col_c.getOutEdges()).isEmpty(); + assertThat( + col_c.getInEdges().stream() + .map(edge -> edge.getDestination().asDatasetFieldId()) + .filter(field -> field.getFieldName().getValue().equals("col_a")) + .filter(field -> field.getDatasetId().getName().getValue().equals("dataset_a")) + .findAny()) + .isPresent(); + + assertThat(col_a.getInEdges()).isEmpty(); + assertThat( + col_a.getOutEdges().stream() + .map(edge -> edge.getDestination().asDatasetFieldId()) + .filter(field -> field.getFieldName().getValue().equals("col_c")) + .filter(field -> field.getDatasetId().getName().getValue().equals("dataset_b")) + .findAny()) + .isPresent(); + + // verify dataset_C not present in the graph + assertThat(getNode(lineage, "dataset_c", "col_d")).isEmpty(); + } + + @Test + public void testLineageByDatasetId() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineageByField = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + + Lineage lineageByDataset = + lineageService.lineage( + NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), + 20, + Instant.now()); + + // lineage of dataset and column should be equal + assertThat(lineageByField).isEqualTo(lineageByDataset); + } + + @Test + public void testLineageWhenLineageEmpty() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + assertThrows( + NodeIdNotFoundException.class, + () -> + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), + 20, + Instant.now())); + + assertThrows( + NodeIdNotFoundException.class, + () -> + lineageService.lineage( + NodeId.of( + new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))), + 20, + Instant.now())); + + assertThat( + lineageService + .lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), + 20, + Instant.now()) + .getGraph()) + .hasSize(0); + } + + private Optional getNode(Lineage lineage, String datasetName, String fieldName) { + return lineage.getGraph().stream() + .filter(n -> n.getId().asDatasetFieldId().getFieldName().getValue().equals(fieldName)) + .filter( + n -> + n.getId() + .asDatasetFieldId() + .getDatasetId() + .getName() + .getValue() + .equals(datasetName)) + .findAny(); + } +} diff --git a/api/src/test/java/marquez/service/models/NodeIdTest.java b/api/src/test/java/marquez/service/models/NodeIdTest.java index 054508e0b9..f7b15b9300 100644 --- a/api/src/test/java/marquez/service/models/NodeIdTest.java +++ b/api/src/test/java/marquez/service/models/NodeIdTest.java @@ -10,8 +10,10 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import marquez.common.models.DatasetFieldId; import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; +import marquez.common.models.FieldName; import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; @@ -111,4 +113,32 @@ public void testDatasetWithVersion(String namespace, String dataset) { assertEquals( dataset.split(VERSION_DELIM)[1], nodeId.asDatasetVersionId().getVersion().toString()); } + + @ParameterizedTest(name = "testDatasetField-{index} {argumentsWithNames}") + @CsvSource( + value = { + "my-namespace$my-dataset$colA", + "gs://bucket$/path/to/data$colA", + "gs://bucket$/path/to/data$col_A" + }, + delimiter = '$') + public void testDatasetField(String namespace, String dataset, String field) { + NamespaceName namespaceName = NamespaceName.of(namespace); + FieldName fieldName = FieldName.of(field); + DatasetName datasetName = DatasetName.of(dataset); + DatasetId dsId = new DatasetId(namespaceName, datasetName); + DatasetFieldId dsfId = new DatasetFieldId(dsId, fieldName); + NodeId nodeId = NodeId.of(dsfId); + assertFalse(nodeId.isRunType()); + assertFalse(nodeId.isJobType()); + assertFalse(nodeId.isDatasetType()); + assertFalse(nodeId.hasVersion()); + assertTrue(nodeId.isDatasetFieldType()); + + assertEquals(dsfId, nodeId.asDatasetFieldId()); + assertEquals(nodeId, NodeId.of(nodeId.getValue())); + assertEquals(namespace, nodeId.asDatasetFieldId().getDatasetId().getNamespace().getValue()); + assertEquals(dataset, nodeId.asDatasetFieldId().getDatasetId().getName().getValue()); + assertEquals(field, nodeId.asDatasetFieldId().getFieldName().getValue()); + } } diff --git a/api/src/test/resources/column_lineage/node.json b/api/src/test/resources/column_lineage/node.json new file mode 100644 index 0000000000..88ac55f25d --- /dev/null +++ b/api/src/test/resources/column_lineage/node.json @@ -0,0 +1,28 @@ +{ + "id": "datasetField:namespace:commonDataset:columnA", + "type": "DATASET_FIELD", + "data": { + "type": "DATASET_FIELD", + "namespace": "namespace", + "name": "otherDataset", + "field": "columnA", + "dataType": "integer", + "transformationDescription": "identical", + "transformationType": "IDENTITY", + "inputFields": [ + { "namespace": "namespace" , "name": "otherDataset", "field": "columnB"} + ] + }, + "inEdges": [ + { + "origin": "datasetField:namespace:otherDataset:columnB", + "destination": "datasetField:namespace:commonDataset:columnA" + } + ], + "outEdges": [ + { + "origin": "datasetField:namespace:commonDataset:columnA", + "destination": "datasetField:namespace:otherDataset:columnC" + } + ] +}