Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream run level lineage implementation #2658

Merged
merged 9 commits into from
Nov 6, 2023
Merged
18 changes: 18 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.common.models.RunId;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
Expand Down Expand Up @@ -130,6 +131,23 @@ public Response getLineageEvents(
return Response.ok(new Events(events, totalCount)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/runlineage/upstream")
public Response getRunLineageUpstream(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mind documenting this in openapi.spec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@QueryParam("runId") @NotNull RunId runId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("facets") String facets) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we pulling this in? I don't see it being used in the service. Did you intend to do something in the TODO block you left in there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my idea is to be able to select what facets to return for each dataset_version job_version and run in the result.
I can remove it for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this parameter in this iteration

throwIfNotExists(runId);
return Response.ok(
lineageService.upstream(runId, depth, facets == null ? null : facets.split(",")))
.build();
}

@Value
static class Events {
@NonNull
Expand Down
63 changes: 63 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,22 @@

package marquez.db;

import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.mappers.DatasetDataMapper;
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.UpstreamRunRowMapper;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
Expand All @@ -25,8 +32,18 @@
@RegisterRowMapper(JobDataMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(UpstreamRunRowMapper.class)
public interface LineageDao {

public record JobSummary(NamespaceName namespace, JobName name, UUID version) {}

public record RunSummary(RunId id, Instant start, Instant end, String status) {}

public record DatasetSummary(
NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {}

public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {}

/**
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
Expand Down Expand Up @@ -154,4 +171,50 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
WITH RECURSIVE
upstream_runs(
r_uuid, -- run uuid
dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run
u_r_uuid, -- upstream run that produced that dataset version
depth -- current depth of traversal
) AS (

-- initial case: find the inputs of the initial runs
select r.uuid,
dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name,
dv.run_uuid,
0 AS depth -- starts at 0
FROM (SELECT :runId::uuid AS uuid) r -- initial run
LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid
LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking loudly. Wouldn't be better to join dataset_versions after the recursion at the bottom of the query once all the runs are identified?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of that as well but we actually need dataset_versions in the recursion because this is where we find the run_uuid that produced the DV for the next iteration.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idea: we could add that run uuid to runs_input_mapping at the same time as the dataset_version that would allow to join just on that table in the recursion. That'd be neat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get the idea: do you want to add dataset_versions to runs_input_mapping or run_uuid to dataset_versions?
Anyway, the current state of the query looks good to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is we we could add the dataset_versions.run_uuid to the runs_input_mapping table.
The run_uuid is always already set in the dataset_versions table when we write the dataset_versions.uuid to the runs_input_mapping table and it never changes since it is the run that created that dataset version.
Then you just need to recursively join the runs_input_mapping over itself

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I would keep that for future improvement.


UNION

-- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop
SELECT
ur.u_r_uuid,
dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name,
dv2.run_uuid,
ur.depth + 1 AS depth -- increase depth to check end condition
FROM upstream_runs ur
LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid
LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid
-- end condition of the recursion: no input or depth is over the maximum set
-- also avoid following cycles (merge statement)
WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth
)

-- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph.
SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid)
upstream_runs.*,
-- we add the run information after the recursion so that we join with the large run table only once
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

r.started_at, r.ended_at, r.current_run_state as state,
r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name
FROM upstream_runs, runs r where upstream_runs.r_uuid = r.uuid;
;
""")
List<UpstreamRunRow> getUpstreamRuns(@NotNull UUID runId, int depth);
}
50 changes: 50 additions & 0 deletions api/src/main/java/marquez/db/mappers/UpstreamRunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;
import lombok.NonNull;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.db.Columns;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.LineageDao.UpstreamRunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class UpstreamRunRowMapper implements RowMapper<UpstreamRunRow> {
@Override
public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return new UpstreamRunRow(
new JobSummary(
new NamespaceName(stringOrThrow(results, "job_namespace")),
new JobName(stringOrThrow(results, "job_name")),
UUID.fromString(stringOrThrow(results, "job_version_uuid"))),
new RunSummary(
new RunId(uuidOrThrow(results, "r_uuid")),
timestampOrThrow(results, Columns.STARTED_AT),
timestampOrThrow(results, Columns.ENDED_AT),
stringOrThrow(results, Columns.STATE)),
results.getObject("dataset_name") == null
? null
: new DatasetSummary(
new NamespaceName(stringOrThrow(results, "dataset_namespace")),
new DatasetName(stringOrThrow(results, "dataset_name")),
UUID.fromString(stringOrThrow(results, "dataset_version_uuid")),
new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid")))));
}
}
37 changes: 37 additions & 0 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

package marquez.service;

import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -21,14 +25,20 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.validation.constraints.NotNull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.DatasetId;
import marquez.common.models.JobId;
import marquez.common.models.RunId;
import marquez.db.JobDao;
import marquez.db.LineageDao;
import marquez.db.LineageDao.DatasetSummary;
import marquez.db.LineageDao.JobSummary;
import marquez.db.LineageDao.RunSummary;
import marquez.db.models.JobRow;
import marquez.service.DelegatingDaos.DelegatingLineageDao;
import marquez.service.LineageService.UpstreamRunLineage;
import marquez.service.models.DatasetData;
import marquez.service.models.Edge;
import marquez.service.models.Graph;
Expand All @@ -41,6 +51,11 @@

@Slf4j
public class LineageService extends DelegatingLineageDao {

public record UpstreamRunLineage(List<UpstreamRun> runs) {}

public record UpstreamRun(JobSummary job, RunSummary run, List<DatasetSummary> inputs) {}

private final JobDao jobDao;

public LineageService(LineageDao delegate, JobDao jobDao) {
Expand Down Expand Up @@ -252,4 +267,26 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
}
}

public UpstreamRunLineage upstream(@NotNull RunId runId, int depth, String[] facets
/** TODO */
) {
List<UpstreamRunRow> upstreamRuns = getUpstreamRuns(runId.getValue(), depth);
Map<RunId, List<UpstreamRunRow>> collect =
upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList()));
List<UpstreamRun> runs =
collect.entrySet().stream()
.map(
row -> {
UpstreamRunRow upstreamRunRow = row.getValue().get(0);
List<DatasetSummary> inputs =
row.getValue().stream()
.map(UpstreamRunRow::input)
.filter(i -> i != null)
.collect(toList());
return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs);
})
.collect(toList());
return new UpstreamRunLineage(runs);
}
}
Loading