diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 02e603c823..a534a744d9 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -96,7 +96,7 @@ private int determineStatusCode(Throwable e) { public Response getLineage( @QueryParam("nodeId") @NotNull NodeId nodeId, @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) { - return Response.ok(lineageService.lineage(nodeId, depth)).build(); + return Response.ok(lineageService.lineage(nodeId, depth, true)).build(); } @Timed diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index d48e9f394e..a3b92904d0 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -129,5 +129,15 @@ WHERE ds.uuid IN ()""") + " WHERE run_uuid=r.uuid\n" + " GROUP BY run_uuid\n" + ") ro ON ro.run_uuid=r.uuid") + List getCurrentRunsWithFacets(@BindList Collection jobUuid); + + @SqlQuery( + """ + SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version + FROM runs_view r + INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid + INNER JOIN jobs_view j ON j.uuid=jv.job_uuid + WHERE j.uuid in () OR j.symlink_target_uuid IN () + ORDER BY r.job_name, r.namespace_name, created_at DESC""") List getCurrentRuns(@BindList Collection jobUuid); } diff --git a/api/src/main/java/marquez/db/mappers/RunMapper.java b/api/src/main/java/marquez/db/mappers/RunMapper.java index fbccbde139..3a1f95eb64 100644 --- a/api/src/main/java/marquez/db/mappers/RunMapper.java +++ b/api/src/main/java/marquez/db/mappers/RunMapper.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; @@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) ? timestampOrNull(results, columnPrefix + Columns.ENDED_AT) : null, durationMs.orElse(null), - toArgs(results, columnPrefix + Columns.ARGS), + toArgsOrNull(results, columnPrefix + Columns.ARGS), stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME), stringOrThrow(results, columnPrefix + Columns.JOB_NAME), uuidOrNull(results, columnPrefix + Columns.JOB_VERSION), @@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context) columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS) ? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS) : ImmutableList.of(), - JobMapper.toContext(results, columnPrefix + Columns.CONTEXT), + columnNames.contains(columnPrefix + Columns.CONTEXT) + ? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT) + : null, toFacetsOrNull(results, columnPrefix + Columns.FACETS)); } @@ -94,8 +97,12 @@ private List toDatasetVersion(ResultSet rs, String column) thr return Utils.fromJson(dsString, new TypeReference>() {}); } - private Map toArgs(ResultSet results, String column) throws SQLException { - String args = stringOrNull(results, column); + private Map toArgsOrNull(ResultSet results, String argsColumn) + throws SQLException { + if (!Columns.exists(results, argsColumn)) { + return ImmutableMap.of(); + } + String args = stringOrNull(results, argsColumn); if (args == null) { return null; } diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 32b0efb6b0..70643b2fd0 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -46,7 +46,8 @@ public LineageService(LineageDao delegate, JobDao jobDao) { this.jobDao = jobDao; } - public Lineage lineage(NodeId nodeId, int depth) { + // TODO make input parameters easily extendable if adding more options like 'withJobFacets' + public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { Optional optionalUUID = getJobUuid(nodeId); if (optionalUUID.isEmpty()) { throw new NodeIdNotFoundException("Could not find node"); @@ -56,8 +57,11 @@ public Lineage lineage(NodeId nodeId, int depth) { Set jobData = getLineage(Collections.singleton(job), depth); List runs = - getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet())); - // todo fix runtime + withRunFacets + ? getCurrentRunsWithFacets( + jobData.stream().map(JobData::getUuid).collect(Collectors.toSet())) + : getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet())); + for (JobData j : jobData) { if (j.getLatestRun().isEmpty()) { for (Run run : runs) { diff --git a/api/src/test/java/marquez/api/OpenLineageResourceTest.java b/api/src/test/java/marquez/api/OpenLineageResourceTest.java index 3d3445150c..fac1bb08ec 100644 --- a/api/src/test/java/marquez/api/OpenLineageResourceTest.java +++ b/api/src/test/java/marquez/api/OpenLineageResourceTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,7 +42,7 @@ class OpenLineageResourceTest { OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"), new TypeReference<>() {}); LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); - when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE); + when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE); ServiceFactory serviceFactory = ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService)); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 9e5a78971c..c1bd46561a 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -773,7 +773,7 @@ public void testGetCurrentRuns() { Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId)) .collect(Collectors.toSet()); - List currentRuns = lineageDao.getCurrentRuns(jobids); + List currentRuns = lineageDao.getCurrentRunsWithFacets(jobids); // assert the job does exist assertThat(currentRuns) @@ -790,7 +790,7 @@ public void testGetCurrentRunsWithFailedJob() { Set jobids = Collections.singleton(writeJob.getJob().getUuid()); - List currentRuns = lineageDao.getCurrentRuns(jobids); + List currentRuns = lineageDao.getCurrentRunsWithFacets(jobids); // assert the job does exist assertThat(currentRuns) @@ -799,6 +799,56 @@ public void testGetCurrentRunsWithFailedJob() { .contains(writeJob.getRun().getUuid()); } + @Test + public void testGetCurrentRunsWithFacetsGetsLatestRun() { + for (int i = 0; i < 5; i++) { + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + } + + List newRows = + writeDownstreamLineage( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 3, Optional.of("outputData2")), + new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))), + jobFacet, + dataset); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, "writeJob", "FAIL", jobFacet, Arrays.asList(), Arrays.asList(dataset)); + + Set expectedRunIds = + Stream.concat( + Stream.of(writeJob.getRun().getUuid()), newRows.stream().map(JobLineage::getRunId)) + .collect(Collectors.toSet()); + Set jobids = + Stream.concat( + Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId)) + .collect(Collectors.toSet()); + + List currentRuns = lineageDao.getCurrentRunsWithFacets(jobids); + + // assert the job does exist + assertThat(currentRuns) + .hasSize(expectedRunIds.size()) + .extracting(r -> r.getId().getValue()) + .containsAll(expectedRunIds); + + // assert that run_args, input/output versions, and run facets are fetched from the dao. + for (Run run : currentRuns) { + assertThat(run.getArgs()).hasSize(2); + assertThat(run.getOutputVersions()).hasSize(1); + assertThat(run.getFacets()).hasSize(1); + } + } + @Test public void testGetCurrentRunsGetsLatestRun() { for (int i = 0; i < 5; i++) { diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 47641d4faa..52fd9deef5 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -124,7 +124,8 @@ public void testLineage() { dataset); String jobName = writeJob.getJob().getName(); Lineage lineage = - lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2); + lineageService.lineage( + NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true); // 1 writeJob + 1 commonDataset // 20 readJob + 20 outputData @@ -232,7 +233,8 @@ public void testLineageWithDeletedDataset() { String jobName = writeJob.getJob().getName(); Lineage lineage = - lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2); + lineageService.lineage( + NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true); // 1 writeJob + 0 commonDataset is hidden // 20 readJob + 20 outputData @@ -281,7 +283,8 @@ public void testLineageWithDeletedDataset() { jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset"); lineage = - lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2); + lineageService.lineage( + NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true); // 1 writeJob + 0 commonDataset is hidden // 20 readJob + 20 outputData @@ -311,7 +314,9 @@ public void testLineageWithNoDatasets() { openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList()); Lineage lineage = lineageService.lineage( - NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5); + NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), + 5, + true); assertThat(lineage.getGraph()) .hasSize(1) .first() @@ -362,7 +367,8 @@ public void testLineageWithWithCycle() { lineageService.lineage( NodeId.of( new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())), - 5); + 5, + true); assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6); ObjectAssert datasetNode = assertThat(lineage.getGraph())