Skip to content

Commit

Permalink
Remove need to query JobRow on run completion
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed Jul 20, 2022
1 parent 7e0f47a commit 5c88d5b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 29 deletions.
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,19 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* code location, and context. A version for a given job is created <i>only</i> when a {@link Run}
* transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param namespaceName The namespace for the job version.
* @param jobName The name of the job.
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
// Get the job.
final JobDao jobDao = createJobDao();
final JobRow jobRow = jobDao.findJobByNameAsRow(namespaceName, jobName).get();

// Get the job context.
final UUID jobContextUuid = jobRow.getJobContextUuid().get();
Expand Down
3 changes: 1 addition & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ default void updateMarquezOnComplete(
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getRun().getNamespaceName(),
updateLineageRow.getRun().getJobName(),
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/service/RunService.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.BaseDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.RunStateDao;
Expand All @@ -48,13 +49,15 @@ public class RunService extends DelegatingDaos.DelegatingRunDao {
private final JobVersionDao jobVersionDao;
private final RunStateDao runStateDao;
private final Collection<RunTransitionListener> runTransitionListeners;
private final JobDao jobDao;

public RunService(
@NonNull BaseDao baseDao, Collection<RunTransitionListener> runTransitionListeners) {
super(baseDao.createRunDao());
this.jobVersionDao = baseDao.createJobVersionDao();
this.runStateDao = baseDao.createRunStateDao();
this.runTransitionListeners = runTransitionListeners;
this.jobDao = baseDao.createJobDao();
}

/**
Expand Down Expand Up @@ -83,8 +86,9 @@ public void markRunAs(
if (runState.isDone()) {
BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
runRow.getNamespaceName(),
runRow.getJobName(),
jobDao
.findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName())
.orElseThrow(),
runRow.getUuid(),
runState,
transitionedAt);
Expand Down
12 changes: 2 additions & 10 deletions api/src/test/java/marquez/db/JobVersionDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,7 @@ public void testGetJobVersions() {
jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs());

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow.getNamespaceName(),
jobRow.getName(),
runRow.getUuid(),
RunState.COMPLETED,
Instant.now());
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());

List<JobVersion> jobVersions =
jobVersionDao.findAllJobVersions(namespaceRow.getName(), jobRow.getName(), 10, 0);
Expand Down Expand Up @@ -288,11 +284,7 @@ public void testUpsertJobVersionOnRunTransition() {
// (6) Add a new job version on the run state transition to COMPLETED.
final BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
jobRow.getNamespaceName(),
jobRow.getName(),
runRow.getUuid(),
RunState.COMPLETED,
newTimestamp());
jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp());

// Ensure the job version is associated with the latest run.
final RunRow latestRunRowForJobVersion = runDao.findRunByUuidAsRow(runRow.getUuid()).get();
Expand Down
12 changes: 2 additions & 10 deletions api/src/test/java/marquez/db/RunDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ public void getRun() {
jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs());

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow.getNamespaceName(),
jobRow.getName(),
runRow.getUuid(),
RunState.COMPLETED,
Instant.now());
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());

Optional<Run> run = runDao.findRunByUuid(runRow.getUuid());
assertThat(run)
Expand Down Expand Up @@ -211,11 +207,7 @@ private Stream<RunRow> createRunsForJob(
jdbi, runRow.getUuid(), RunState.COMPLETED, outputs);

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow.getNamespaceName(),
jobRow.getName(),
runRow.getUuid(),
RunState.COMPLETED,
Instant.now());
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());
return runRow;
});
}
Expand Down

0 comments on commit 5c88d5b

Please sign in to comment.