Skip to content

Commit

Permalink
optimizing current runs query for lieage api
Browse files Browse the repository at this point in the history
  • Loading branch information
prachim-collab committed Oct 24, 2022
1 parent cc71261 commit 6ca59e1
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 43 deletions.
35 changes: 2 additions & 33 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,42 +92,11 @@ WHERE ds.uuid IN (<dsUuids>)""")
Optional<UUID> getJobFromInputOrOutput(String datasetName, String namespaceName);

@SqlQuery(
"WITH latest_runs AS (\n"
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
"SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
+ ")\n"
+ "SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ " r.version AS job_version, ri.input_versions, ro.output_versions\n"
+ " from latest_runs AS r\n"
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " WHERE le.run_uuid=r.uuid\n"
+ " GROUP BY le.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT im.run_uuid,\n"
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " WHERE im.run_uuid=r.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " WHERE run_uuid=r.uuid\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid")
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
}
15 changes: 11 additions & 4 deletions api/src/main/java/marquez/db/mappers/RunMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
? timestampOrNull(results, columnPrefix + Columns.ENDED_AT)
: null,
durationMs.orElse(null),
toArgs(results, columnPrefix + Columns.ARGS),
toArgsOrNull(results, columnPrefix + Columns.ARGS),
stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME),
stringOrThrow(results, columnPrefix + Columns.JOB_NAME),
uuidOrNull(results, columnPrefix + Columns.JOB_VERSION),
Expand All @@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS)
? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS)
: ImmutableList.of(),
JobMapper.toContext(results, columnPrefix + Columns.CONTEXT),
columnNames.contains(columnPrefix + Columns.CONTEXT)
? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT)
: null,
toFacetsOrNull(results, columnPrefix + Columns.FACETS));
}

Expand All @@ -94,8 +97,12 @@ private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) thr
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}

private Map<String, String> toArgs(ResultSet results, String column) throws SQLException {
String args = stringOrNull(results, column);
private Map<String, String> toArgsOrNull(ResultSet results, String argsColumn)
throws SQLException {
if (!Columns.exists(results, argsColumn)) {
return ImmutableMap.of();
}
String args = stringOrNull(results, argsColumn);
if (args == null) {
return null;
}
Expand Down
6 changes: 0 additions & 6 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,6 @@ public void testLineage() {
runAssert
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(0);
runAssert
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand Down Expand Up @@ -266,9 +263,6 @@ public void testLineageWithDeletedDataset() {
runAssert
.extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(0);
runAssert
.extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class))
.hasSize(1);

// check the output edges for the commonDataset node
assertThat(lineage.getGraph())
Expand Down

0 comments on commit 6ca59e1

Please sign in to comment.