Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimizing current runs query for lieage api #2211

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private int determineStatusCode(Throwable e) {
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
return Response.ok(lineageService.lineage(nodeId, depth)).build();
return Response.ok(lineageService.lineage(nodeId, depth, true)).build();
}

@Timed
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,15 @@ WHERE ds.uuid IN (<dsUuids>)""")
+ " WHERE run_uuid=r.uuid\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid")
List<Run> getCurrentRunsWithFacets(@BindList Collection<UUID> jobUuid);

@SqlQuery(
"""
SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
FROM runs_view r
INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
}
15 changes: 11 additions & 4 deletions api/src/main/java/marquez/db/mappers/RunMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
Expand Down Expand Up @@ -71,7 +72,7 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
? timestampOrNull(results, columnPrefix + Columns.ENDED_AT)
: null,
durationMs.orElse(null),
toArgs(results, columnPrefix + Columns.ARGS),
toArgsOrNull(results, columnPrefix + Columns.ARGS),
stringOrThrow(results, columnPrefix + Columns.NAMESPACE_NAME),
stringOrThrow(results, columnPrefix + Columns.JOB_NAME),
uuidOrNull(results, columnPrefix + Columns.JOB_VERSION),
Expand All @@ -82,7 +83,9 @@ public Run map(@NonNull ResultSet results, @NonNull StatementContext context)
columnNames.contains(columnPrefix + Columns.OUTPUT_VERSIONS)
? toDatasetVersion(results, columnPrefix + Columns.OUTPUT_VERSIONS)
: ImmutableList.of(),
JobMapper.toContext(results, columnPrefix + Columns.CONTEXT),
columnNames.contains(columnPrefix + Columns.CONTEXT)
? JobMapper.toContext(results, columnPrefix + Columns.CONTEXT)
: null,
toFacetsOrNull(results, columnPrefix + Columns.FACETS));
}

Expand All @@ -94,8 +97,12 @@ private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) thr
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}

private Map<String, String> toArgs(ResultSet results, String column) throws SQLException {
String args = stringOrNull(results, column);
private Map<String, String> toArgsOrNull(ResultSet results, String argsColumn)
throws SQLException {
if (!Columns.exists(results, argsColumn)) {
return ImmutableMap.of();
}
String args = stringOrNull(results, argsColumn);
if (args == null) {
return null;
}
Expand Down
10 changes: 7 additions & 3 deletions api/src/main/java/marquez/service/LineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public LineageService(LineageDao delegate, JobDao jobDao) {
this.jobDao = jobDao;
}

public Lineage lineage(NodeId nodeId, int depth) {
// TODO make input parameters easily extendable if adding more options like 'withJobFacets'
public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change, but I worry that we'll want to add more options to this method (e.g., include job facets? dataset facets? exclude runs altogether?). I don't think we should take this on now, but let's add a TODO to make the input parameters here more easily extendable so that we can add those other options later one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't agree more. I had small aversion to adding a flag to make this work, but there was no other better option. I also thought in future if more changes like this come along that alter api significantly, we could add these as options to api query parameters, or create more broken apis to get specific data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added TODO

Optional<UUID> optionalUUID = getJobUuid(nodeId);
if (optionalUUID.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
Expand All @@ -56,8 +57,11 @@ public Lineage lineage(NodeId nodeId, int depth) {
Set<JobData> jobData = getLineage(Collections.singleton(job), depth);

List<Run> runs =
getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));
// todo fix runtime
withRunFacets
? getCurrentRunsWithFacets(
jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()))
: getCurrentRuns(jobData.stream().map(JobData::getUuid).collect(Collectors.toSet()));

for (JobData j : jobData) {
if (j.getLatestRun().isEmpty()) {
for (Run run : runs) {
Expand Down
3 changes: 2 additions & 1 deletion api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -41,7 +42,7 @@ class OpenLineageResourceTest {
OpenLineageResourceTest.class.getResourceAsStream("/lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), anyInt())).thenReturn(LINEAGE);
when(lineageService.lineage(any(NodeId.class), anyInt(), anyBoolean())).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(Map.of(LineageService.class, lineageService));
Expand Down
54 changes: 52 additions & 2 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ public void testGetCurrentRuns() {
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -790,7 +790,7 @@ public void testGetCurrentRunsWithFailedJob() {

Set<UUID> jobids = Collections.singleton(writeJob.getJob().getUuid());

List<Run> currentRuns = lineageDao.getCurrentRuns(jobids);
List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
Expand All @@ -799,6 +799,56 @@ public void testGetCurrentRunsWithFailedJob() {
.contains(writeJob.getRun().getUuid());
}

@Test
public void testGetCurrentRunsWithFacetsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));
}

List<JobLineage> newRows =
writeDownstreamLineage(
openLineageDao,
new LinkedList<>(
Arrays.asList(
new DatasetConsumerJob("readJob", 3, Optional.of("outputData2")),
new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
jobFacet,
dataset);
UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao, "writeJob", "FAIL", jobFacet, Arrays.asList(), Arrays.asList(dataset));

Set<UUID> expectedRunIds =
Stream.concat(
Stream.of(writeJob.getRun().getUuid()), newRows.stream().map(JobLineage::getRunId))
.collect(Collectors.toSet());
Set<UUID> jobids =
Stream.concat(
Stream.of(writeJob.getJob().getUuid()), newRows.stream().map(JobLineage::getId))
.collect(Collectors.toSet());

List<Run> currentRuns = lineageDao.getCurrentRunsWithFacets(jobids);

// assert the job does exist
assertThat(currentRuns)
.hasSize(expectedRunIds.size())
.extracting(r -> r.getId().getValue())
.containsAll(expectedRunIds);

// assert that run_args, input/output versions, and run facets are fetched from the dao.
for (Run run : currentRuns) {
assertThat(run.getArgs()).hasSize(2);
assertThat(run.getOutputVersions()).hasSize(1);
assertThat(run.getFacets()).hasSize(1);
}
}

@Test
public void testGetCurrentRunsGetsLatestRun() {
for (int i = 0; i < 5; i++) {
Expand Down
16 changes: 11 additions & 5 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void testLineage() {
dataset);
String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 1 commonDataset
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -232,7 +233,8 @@ public void testLineageWithDeletedDataset() {

String jobName = writeJob.getJob().getName();
Lineage lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -281,7 +283,8 @@ public void testLineageWithDeletedDataset() {
jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset");

lineage =
lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2);
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2, true);

// 1 writeJob + 0 commonDataset is hidden
// 20 readJob + 20 outputData
Expand Down Expand Up @@ -311,7 +314,9 @@ public void testLineageWithNoDatasets() {
openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList());
Lineage lineage =
lineageService.lineage(
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())), 5);
NodeId.of(new NamespaceName(NAMESPACE), new JobName(writeJob.getJob().getName())),
5,
true);
assertThat(lineage.getGraph())
.hasSize(1)
.first()
Expand Down Expand Up @@ -362,7 +367,8 @@ public void testLineageWithWithCycle() {
lineageService.lineage(
NodeId.of(
new NamespaceName(NAMESPACE), new JobName(intermediateJob.getJob().getName())),
5);
5,
true);
assertThat(lineage.getGraph()).extracting(Node::getId).hasSize(6);
ObjectAssert<Node> datasetNode =
assertThat(lineage.getGraph())
Expand Down