Skip to content

Commit

Permalink
upstream run level lineage implementation (#2658)
Browse files Browse the repository at this point in the history
* upstream run level lineage implementation

Signed-off-by: Julien Le Dem <julien@apache.org>

---------

Signed-off-by: Julien Le Dem <julien@apache.org>
  • Loading branch information
julienledem authored Nov 6, 2023
1 parent 42cadbb commit 752ac23
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*Improves experience with large graphs by adding a new tab to move between graph elements without looking at the graph itself.*
* Web: add hover-over Tag tooltip to datasets [`#2630`](https://github.com/MarquezProject/marquez/pull/2630) [@davidsharp7](https://github.com/davidsharp7)
*For parity with columns in the GUI, this adds a Tag tooltip to datasets.*
* API: upstream run-level lineage API [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem]( https://github.com/julienledem)
*When trouble shooting an issue and doing root cause analysis, it is usefull to get the upstream run-level lineage to know exactly what version of each job and dataset a run is depending on.*

### Changed
* Docker: upgrade to Docker Compose V2 [`#2644`](https://github.com/MarquezProject/marquez/pull/2644) [@merobi-hub](https://github.com/merobi-hub)
Expand Down
23 changes: 23 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.common.models.RunId;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
Expand Down Expand Up @@ -136,6 +137,28 @@ public Response getLineageEvents(
return Response.ok(new Events(events, totalCount)).build();
}

/**
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
* -> the run that produced it
*
* @param runId the run to get upstream lineage from
* @param depth the maximum depth of the upstream lineage
* @return the upstream lineage for that run up to `detph` levels
*/
@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/runlineage/upstream")
public Response getRunLineageUpstream(
@QueryParam("runId") @NotNull RunId runId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
throwIfNotExists(runId);
return Response.ok(lineageService.upstream(runId, depth)).build();
}

@Value
static class Events {
@NonNull
Expand Down
43 changes: 16 additions & 27 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ public static UUID uuidOrNull(final ResultSet results, final String column) thro
}

public static UUID uuidOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getObject(column, UUID.class);
}

Expand All @@ -166,9 +164,7 @@ public static Instant timestampOrNull(final ResultSet results, final String colu

public static Instant timestampOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getTimestamp(column).toInstant();
}

Expand All @@ -182,9 +178,7 @@ public static String stringOrNull(final ResultSet results, final String column)

public static String stringOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getString(column);
}

Expand All @@ -199,40 +193,30 @@ public static boolean booleanOrDefault(

public static boolean booleanOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getBoolean(column);
}

public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getInt(column);
}

public static PGInterval pgIntervalOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return new PGInterval(results.getString(column));
}

public static BigDecimal bigDecimalOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return results.getBigDecimal(column);
}

public static List<UUID> uuidArrayOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return Arrays.asList((UUID[]) results.getArray(column).getArray());
}

Expand All @@ -246,9 +230,7 @@ public static List<UUID> uuidArrayOrEmpty(final ResultSet results, final String

public static List<String> stringArrayOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
checkNotNull(results, column);
return Arrays.asList((String[]) results.getArray(column).getArray());
}

Expand Down Expand Up @@ -311,4 +293,11 @@ public static PGobject toPgObject(@NonNull final Object object) {
}
return jsonObject;
}

private static void checkNotNull(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException(column + " not found in result");
}
}
}
64 changes: 64 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@

package marquez.db;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.mappers.DatasetDataMapper;
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.UpstreamRunRowMapper;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
Expand All @@ -25,8 +32,18 @@
@RegisterRowMapper(JobDataMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(UpstreamRunRowMapper.class)
public interface LineageDao {

public record JobSummary(NamespaceName namespace, JobName name, UUID version) {}

public record RunSummary(RunId id, Instant start, Instant end, String status) {}

public record DatasetSummary(
NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {}

public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {}

/**
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
Expand Down Expand Up @@ -154,4 +171,51 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
WITH RECURSIVE
upstream_runs(
r_uuid, -- run uuid
dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run
u_r_uuid, -- upstream run that produced that dataset version
depth -- current depth of traversal
) AS (
-- initial case: find the inputs of the initial runs
select r.uuid,
dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name,
dv.run_uuid,
0 AS depth -- starts at 0
FROM (SELECT :runId::uuid AS uuid) r -- initial run
LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid
LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid
UNION
-- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop
SELECT
ur.u_r_uuid,
dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name,
dv2.run_uuid,
ur.depth + 1 AS depth -- increase depth to check end condition
FROM upstream_runs ur
LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid
LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid
-- end condition of the recursion: no input or depth is over the maximum set
-- also avoid following cycles (ex: merge statement)
WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth
)
-- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph.
SELECT * FROM ( -- we need the extra statement to sort after the DISTINCT
SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid)
upstream_runs.*,
r.started_at, r.ended_at, r.current_run_state as state,
r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name
FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid
) sub
ORDER BY depth ASC, job_name ASC;
""")
List<UpstreamRunRow> getUpstreamRuns(@NotNull UUID runId, int depth);
}
55 changes: 55 additions & 0 deletions api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import lombok.NonNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.Columns;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.LineageDao.UpstreamRunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

/** Maps the upstream query result set to a UpstreamRunRow */
public final class UpstreamRunRowMapper implements RowMapper<UpstreamRunRow> {
@Override
public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new UpstreamRunRow(
new JobSummary(
new NamespaceName(stringOrThrow(results, "job_namespace")),
new JobName(stringOrThrow(results, "job_name")),
Optional.ofNullable(stringOrNull(results, "job_version_uuid"))
.map(UUID::fromString)
.orElse(null)),
new RunSummary(
new RunId(uuidOrThrow(results, "r_uuid")),
timestampOrNull(results, Columns.STARTED_AT),
timestampOrNull(results, Columns.ENDED_AT),
stringOrThrow(results, Columns.STATE)),
results.getObject("dataset_name") == null
? null
: new DatasetSummary(
new NamespaceName(stringOrThrow(results, "dataset_namespace")),
new DatasetName(stringOrThrow(results, "dataset_name")),
UUID.fromString(stringOrThrow(results, "dataset_version_uuid")),
new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid")))));
}
}
43 changes: 43 additions & 0 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package marquez.service;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -21,14 +25,20 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.common.models.RunId;
import marquez.db.JobDao;
import marquez.db.LineageDao;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.models.JobRow;
import marquez.service.DelegatingDaos.DelegatingLineageDao;
import marquez.service.LineageService.UpstreamRunLineage;
import marquez.service.models.DatasetData;
import marquez.service.models.Edge;
import marquez.service.models.Graph;
Expand All @@ -41,6 +51,11 @@

@Slf4j
public class LineageService extends DelegatingLineageDao {

public record UpstreamRunLineage(List<UpstreamRun> runs) {}

public record UpstreamRun(JobSummary job, RunSummary run, List<DatasetSummary> inputs) {}

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
Expand Down Expand Up @@ -252,4 +267,32 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}

/**
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
* -> the run that produced it
*
* @param runId the run to get upstream lineage from
* @param depth the maximum depth of the upstream lineage
* @return the upstream lineage for that run up to `detph` levels
*/
public UpstreamRunLineage upstream(@NotNull RunId runId, int depth) {
List<UpstreamRunRow> upstreamRuns = getUpstreamRuns(runId.getValue(), depth);
Map<RunId, List<UpstreamRunRow>> collect =
upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList()));
List<UpstreamRun> runs =
collect.entrySet().stream()
.map(
row -> {
UpstreamRunRow upstreamRunRow = row.getValue().get(0);
List<DatasetSummary> inputs =
row.getValue().stream()
.map(UpstreamRunRow::input)
.filter(i -> i != null)
.collect(toList());
return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs);
})
.collect(toList());
return new UpstreamRunLineage(runs);
}
}
Loading

0 comments on commit 752ac23

Please sign in to comment.