Skip to content

Commit

Permalink
add column lineage graph endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Sep 20, 2022
1 parent bf8c84e commit d47f846
Show file tree
Hide file tree
Showing 14 changed files with 690 additions and 2 deletions.
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import marquez.api.TagResource;
import marquez.api.exceptions.JdbiExceptionExceptionMapper;
import marquez.db.BaseDao;
import marquez.db.ColumnLevelLineageDao;
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
Expand All @@ -39,6 +40,7 @@
import marquez.db.TagDao;
import marquez.graphql.GraphqlSchemaBuilder;
import marquez.graphql.MarquezGraphqlServletBuilder;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand Down Expand Up @@ -70,6 +72,7 @@ public final class MarquezContext {
@Getter private final TagDao tagDao;
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final ColumnLevelLineageDao columnLevelLineageDao;
@Getter private final SearchDao searchDao;
@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -81,6 +84,7 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final ColumnLineageService columnLineageService;
@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
Expand Down Expand Up @@ -115,6 +119,7 @@ private MarquezContext(
this.tagDao = jdbi.onDemand(TagDao.class);
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.columnLevelLineageDao = jdbi.onDemand(ColumnLevelLineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.runTransitionListeners = runTransitionListeners;

Expand All @@ -128,6 +133,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.columnLineageService = new ColumnLineageService(lineageDao, columnLevelLineageDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -139,6 +145,7 @@ private MarquezContext(
.openLineageService(openLineageService)
.sourceService(sourceService)
.lineageService(lineageService)
.columnLineageService(columnLineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.build();
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.SourceName;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected ColumnLineageService columnLineageService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.columnLineageService = serviceFactory.getColumnLineageService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
51 changes: 51 additions & 0 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.service.ServiceFactory;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/column-lineage")
public class ColumnLineageResource extends BaseResource {

private static final String DEFAULT_DEPTH = "20";

public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth)).build();
}
// TODO: add another endpoint method (get by field or get by all fields of a dataset)
// TODO: write some tests to endpoint
}
62 changes: 62 additions & 0 deletions api/src/main/java/marquez/db/ColumnLevelLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@

package marquez.db;

import static org.jdbi.v3.sqlobject.customizer.BindList.EmptyHandling.NULL_STRING;

import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.db.mappers.ColumnLevelLineageRowMapper;
import marquez.db.mappers.ColumnLineageNodeDataMapper;
import marquez.db.models.ColumnLevelLineageRow;
import marquez.db.models.ColumnLineageNodeData;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBeanList;
import org.jdbi.v3.sqlobject.customizer.BindList;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(ColumnLevelLineageRowMapper.class)
@RegisterRowMapper(ColumnLineageNodeDataMapper.class)
public interface ColumnLevelLineageDao extends BaseDao {

default List<ColumnLevelLineageRow> upsertColumnLevelLineageRow(
Expand Down Expand Up @@ -90,4 +97,59 @@ void doUpsertColumnLevelLineageRow(
},
value = "values")
ColumnLevelLineageRow... rows);

@SqlQuery(
"""
WITH RECURSIVE
dataset_fields_view AS (
SELECT d.namespace_name as namespace_name, d.name as dataset_name, df.name as field_name, df.type, df.uuid
FROM dataset_fields df
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
),
column_lineage_recursive AS (
SELECT *, 0 as depth
FROM column_level_lineage
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>) AND created_at <= :createdAtUntil
UNION
SELECT
upstream_node.output_dataset_version_uuid,
upstream_node.output_dataset_field_uuid,
upstream_node.input_dataset_version_uuid,
upstream_node.input_dataset_field_uuid,
upstream_node.transformation_description,
upstream_node.transformation_type,
upstream_node.created_at,
upstream_node.updated_at,
node.depth + 1 as depth
FROM column_level_lineage upstream_node, column_lineage_recursive node
WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid
AND node.depth < :depth
)
SELECT
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
Instant createdAtUntil);
}
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ private List<ColumnLevelLineageRow> upsertColumnLineage(
if (outputField.isEmpty()) {
Logger log = LoggerFactory.getLogger(OpenLineageDao.class);
log.error(
"Cannot produce column lineage for missing output field in output dataset: %s",
"Cannot produce column lineage for missing output field in output dataset: {}",
outputColumn.getName());
return Stream.empty();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import marquez.db.Columns;
import marquez.db.models.ColumnLineageNodeData;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;
import org.postgresql.jdbc.PgArray;

@Slf4j
public class ColumnLineageNodeDataMapper implements RowMapper<ColumnLineageNodeData> {

private static final ObjectMapper MAPPER = Utils.getMapper();

@Override
public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws SQLException {
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrThrow(results, Columns.TYPE),
stringOrThrow(results, TRANSFORMATION_DESCRIPTION),
stringOrThrow(results, TRANSFORMATION_TYPE),
toInputFields(results, "inputFields"));
}

public static ImmutableList<List<String>> toInputFields(ResultSet results, String column)
throws SQLException {
if (results.getObject(column) == null) {
return ImmutableList.of();
}

PgArray pgArray = (PgArray) results.getObject(column);
Object[] deserializedArray = (Object[]) pgArray.getArray();

return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(
arr ->
Arrays.asList(
arr[0], arr[1],
arr[2])) // TODO: add check array size and write unit test for this
.collect(Collectors.toList()));
}
}
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/db/models/ColumnLineageNodeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.models;

import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;

@Getter
@AllArgsConstructor
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String name;
@NonNull String field;
@NonNull String type;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<List<String>> inputFields;
}
28 changes: 28 additions & 0 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service;

import lombok.extern.slf4j.Slf4j;
import marquez.db.ColumnLevelLineageDao;
import marquez.db.LineageDao;
import marquez.service.models.Lineage;
import marquez.service.models.NodeId;

@Slf4j
public class ColumnLineageService extends DelegatingDaos.DelegatingLineageDao {
private final ColumnLevelLineageDao dao;

public ColumnLineageService(LineageDao delegate, ColumnLevelLineageDao dao) {
super(delegate);
this.dao = dao;
}

public Lineage lineage(NodeId nodeId, int depth) {
// do something
log.info("yupi");
return null;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/ServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public class ServiceFactory {
@NonNull DatasetVersionService datasetVersionService;
@NonNull DatasetFieldService datasetFieldService;
@NonNull LineageService lineageService;
@NonNull ColumnLineageService columnLineageService;
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/service/models/NodeType.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
public enum NodeType {
DATASET,
JOB,
RUN;
RUN,
FIELD;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
create index dataset_fields_dataset_fields
on dataset_fields (dataset_uuid);
4 changes: 4 additions & 0 deletions api/src/test/java/marquez/api/ApiTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.mock;

import java.util.Map;
import marquez.service.ColumnLineageService;
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
Expand All @@ -33,6 +34,9 @@ public static ServiceFactory mockServiceFactory(Map<Class, Object> mocks) {
return ServiceFactory.builder()
.lineageService(
(LineageService) mocks.getOrDefault(LineageService.class, (mock(LineageService.class))))
.columnLineageService(
(ColumnLineageService)
mocks.getOrDefault(ColumnLineageService.class, (mock(ColumnLineageService.class))))
.openLineageService(
(OpenLineageService)
mocks.getOrDefault(OpenLineageService.class, (mock(OpenLineageService.class))))
Expand Down
Loading

0 comments on commit d47f846

Please sign in to comment.