Skip to content

Commit

Permalink
write job versions on running
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 16, 2023
1 parent 5b47ca2 commit b4590d0
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 64 deletions.
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ SELECT job_version_uuid, JSON_AGG(facet) as facets
""")
Optional<Job> findJobByName(String namespaceName, String jobName);


@SqlUpdate(
"""
UPDATE jobs
Expand Down
13 changes: 11 additions & 2 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,12 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
default BagOfJobVersionInfo upsertJobVersionReferences(
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
@NonNull Instant transitionedAt,
boolean circuitBreakerOnExistingVersion) {
// Get the job.
final JobDao jobDao = createJobDao();

Expand Down Expand Up @@ -490,6 +491,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
.collect(Collectors.toList())),
jobRow.getLocation());

//
if (circuitBreakerOnExistingVersion
&& jobRow.getCurrentVersionUuid().isPresent()
&& jobVersion.getValue().equals(jobRow.getCurrentVersionUuid().get())) {
// don't upsert additional rows return null
return null;
}

// Add the job version.
final JobVersionDao jobVersionDao = createJobVersionDao();
final JobVersionRow jobVersionRow =
Expand Down
105 changes: 67 additions & 38 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,51 +171,59 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
if (event.getEventType() != null && runState.isDone()) {
updateMarquezOnComplete(event, updateLineageRow, runState);
}

Optional.ofNullable(event.getJob())
.map(Job::getFacets)
.map(JobFacet::getProcessingTypeJobFacet)
.map(ProcessingTypeJobFacet::getProcessingType)
.filter(type -> type.equalsIgnoreCase("STREAMING"))
.ifPresent(a -> updateMarquezOnStreamingJob(event, updateLineageRow, runState));

// TODO: add a columns: jobType, integration, processingType jobs table to determine job type ->
// https://github.com/OpenLineage/OpenLineage/pull/2241
// TODO: verify those columns in jobs' api (should be part of JobData)

// if a job is streaming job - datasets versions should be created on start
// TODO: make it enum, use some method for this
Optional<String> processingType =
Optional.ofNullable(event.getJob())
.map(Job::getFacets)
.map(JobFacet::getProcessingTypeJobFacet)
.map(ProcessingTypeJobFacet::getProcessingType);
// Optional<String> processingType =
// Optional.ofNullable(event.getJob())
// .map(Job::getFacets)
// .map(JobFacet::getProcessingTypeJobFacet)
// .map(ProcessingTypeJobFacet::getProcessingType);

// TODO: migrate this to separate method
// TODO: test streaming facet in integration test (read from facet in json file)
if (processingType.orElse("").equals("STREAMING")) {
// insert IO rows
ModelDaos daos = new ModelDaos();
daos.initBaseDao(this);
updateLineageRow
.getInputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
updateLineageRow
.getOutputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
}
// if (processingType.orElse("").equals("STREAMING")) {
// // insert IO rows
// ModelDaos daos = new ModelDaos();
// daos.initBaseDao(this);
// updateLineageRow
// .getInputs()
// .ifPresent(
// l ->
// l.stream()
// .forEach(
// d ->
// daos.getJobVersionDao()
// .upsertInputDatasetFor(
// null, // no job version Id
// d.getDatasetRow().getUuid(),
// updateLineageRow.getJob().getUuid(),
// updateLineageRow.getJob().getSymlinkTargetId())));
// updateLineageRow
// .getOutputs()
// .ifPresent(
// l ->
// l.stream()
// .forEach(
// d ->
// daos.getJobVersionDao()
// .upsertInputDatasetFor(
// null, // no job version Id
// d.getDatasetRow().getUuid(),
// updateLineageRow.getJob().getUuid(),
// updateLineageRow.getJob().getSymlinkTargetId())));
// }

return updateLineageRow;
}
Expand Down Expand Up @@ -794,14 +802,35 @@ default void updateMarquezOnComplete(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
.upsertJobVersionReferences(
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
event.getEventTime().toInstant(),
false);
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}

default void updateMarquezOnStreamingJob(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
// TODO: make sure it is called once during event start
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionReferences(
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant(),
true);

if (bagOfJobVersionInfo != null) {
// null if version not changed
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}

// TODO: write a test verifying job_version and other entites are created only once for each streaming run
}

default String getUrlOrNull(String uri) {
try {
return new URI(uri).toASCIIString();
Expand Down
10 changes: 9 additions & 1 deletion api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,15 @@ public CompletableFuture<Void> createAsync(LineageEvent event) {
.thenAccept(
(update) -> {
if (event.getEventType() != null) {
if (event.getEventType().equalsIgnoreCase("COMPLETE")) {
boolean isStreaming = Optional.ofNullable(event.getJob())
.map(j -> j.getFacets())
.map(f -> f.getProcessingTypeJobFacet())
.map(p -> p.getProcessingType())
.filter(t -> t.equalsIgnoreCase("STREAMING"))
.stream()
.findAny()
.isPresent();
if (event.getEventType().equalsIgnoreCase("COMPLETE") || isStreaming) {
buildJobOutputUpdate(update).ifPresent(runService::notify);
}
buildJobInputUpdate(update).ifPresent(runService::notify);
Expand Down
5 changes: 3 additions & 2 deletions api/src/main/java/marquez/service/RunService.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ public void markRunAs(

if (runState.isDone()) {
BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
jobVersionDao.upsertJobVersionReferences(
jobDao
.findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName())
.orElseThrow(),
runRow.getUuid(),
runState,
transitionedAt);
transitionedAt,
false);

// TODO: We should also notify that the outputs have been updated when a run is in a done
// state to be consistent with existing job versioning logic. We'll want to add testing to
Expand Down
8 changes: 4 additions & 4 deletions api/src/test/java/marquez/db/JobVersionDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ public void testGetJobVersions() {
DbTestUtils.transitionRunWithOutputs(
jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs());

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());
jobVersionDao.upsertJobVersionReferences(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now(), false);

List<JobVersion> jobVersions =
jobVersionDao.findAllJobVersions(namespaceRow.getName(), jobRow.getName(), 10, 0);
Expand Down Expand Up @@ -310,8 +310,8 @@ public void testUpsertJobVersionOnRunTransition() {

// (6) Add a new job version on the run state transition to COMPLETED.
final BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp());
jobVersionDao.upsertJobVersionReferences(
jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp(), false);

// Ensure the job version is associated with the latest run.
final RunRow latestRunRowForJobVersion = runDao.findRunByUuidAsRow(runRow.getUuid()).get();
Expand Down
8 changes: 4 additions & 4 deletions api/src/test/java/marquez/db/RunDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public void getRun() {
DbTestUtils.transitionRunWithOutputs(
jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs());

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());
jobVersionDao.upsertJobVersionReferences(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now(), false);

Optional<Run> run = runDao.findRunByUuid(runRow.getUuid());
assertThat(run)
Expand Down Expand Up @@ -197,8 +197,8 @@ private Stream<RunRow> createRunsForJob(
DbTestUtils.transitionRunWithOutputs(
jdbi, runRow.getUuid(), RunState.COMPLETED, outputs);

jobVersionDao.upsertJobVersionOnRunTransition(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now());
jobVersionDao.upsertJobVersionReferences(
jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now(), false);
return runRow;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -550,7 +551,8 @@ void testJobEvent() throws ExecutionException, InterruptedException {
}

@Test
void testStreamingJob() {
void testStreamingJob() throws ExecutionException, InterruptedException {
// (1) Create dataset
LineageEvent.Dataset dataset =
LineageEvent.Dataset.builder()
.name(DATASET_NAME)
Expand All @@ -562,15 +564,10 @@ void testStreamingJob() {
PRODUCER_URL,
SCHEMA_URL,
Arrays.asList(new SchemaField("col", "STRING", "my name"))))
.dataSource(
DatasourceDatasetFacet.builder()
.name("theDatasource")
.uri("http://thedatasource")
.build())
.build())
.build();

// Streaming job not followed by a COMPLETE event
// (2) Streaming job not followed by a COMPLETE event writing to a dataset
UUID firstRunId = UUID.randomUUID();
lineageService.createAsync(
LineageEvent.builder()
Expand All @@ -589,17 +586,47 @@ void testStreamingJob() {
.eventTime(Instant.now().atZone(TIMEZONE))
.inputs(new ArrayList<>())
.outputs(Collections.singletonList(dataset))
.build());
.build())
.get();
Optional<Dataset> datasetRow = datasetDao.findDatasetByName(NAMESPACE, DATASET_NAME);

// assertThat(datasetRow).isPresent().flatMap(Dataset::getCurrentVersion).isNotPresent();
// (3) Assert that dataset is present and has dataset_version written
assertThat(datasetRow)
.isPresent()
.flatMap(Dataset::getCurrentVersion)
.isPresent();

// Assert that datasets can be retrieved
// TODO
// (4) Assert that job is present although job_version entry is not
Job job = jobDao.findJobByName(NAMESPACE, JOB_NAME).get();
assertThat(job.getInputs()).isEmpty();
assertThat(job.getOutputs()).hasSize(1);
assertThat(job.getCurrentVersion()).isEmpty();
// TODO: assert job Type: jobType, integration, processingType

// TODO: job can be retrieved by name
// (5) Send COMPLETE event streaming job
lineageService.createAsync(
LineageEvent.builder()
.eventType("COMPLETE")
.run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build()))
.job(
LineageEvent.Job.builder()
.name(JOB_NAME)
.namespace(NAMESPACE)
.facets(
JobFacet.builder()
.processingType(
new ProcessingTypeJobFacet(PRODUCER_URL, SCHEMA_URL, "STREAMING"))
.build())
.build())
.eventTime(Instant.now().atZone(TIMEZONE))
.inputs(new ArrayList<>())
.outputs(Collections.singletonList(dataset))
.build())
.get();

// Assert that it is returned via lineage endpoint
// (6) Assert job version exists
job = jobDao.findJobByName(NAMESPACE, JOB_NAME).get();
assertThat(job.getCurrentVersion()).isPresent();
}

private void checkExists(LineageEvent.Dataset ds) {
Expand Down

0 comments on commit b4590d0

Please sign in to comment.