Skip to content

Commit

Permalink
Flink fix terminal streaming events
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Mar 14, 2024
1 parent 78a191b commit 0074433
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.45.0...HEAD)

### Fixed

* Streaming API: fix behaviour for `COMPLETE`/`FAIL` events within streaming jobs [`#2768`](https://github.com/MarquezProject/marquez/pull/2768) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*New `job_version` is not created for a streaming job terminal event with no dataset information and existing version is kept.*

## [0.45.0](https://github.com/MarquezProject/marquez/compare/0.44.0...0.45.0) - 2024-03-07

### Added
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
}
Expand All @@ -390,7 +390,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
}
Expand Down Expand Up @@ -791,6 +791,10 @@ default void updateMarquezOnStreamingJob(
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());

if (event.isTerminalEventForStreamingJobWithNoDatasets()) {
return;
}

if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
// need to insert new job version
BagOfJobVersionInfo bagOfJobVersionInfo =
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public class LineageEvent extends BaseEvent {
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@JsonIgnore
public boolean isTerminalEvent() {
return (eventType != null)
&& (eventType.equalsIgnoreCase("COMPLETE") || eventType.equalsIgnoreCase("FAIL"));
}

@JsonIgnore
public boolean isTerminalEventForStreamingJobWithNoDatasets() {
return isTerminalEvent()
&& (job != null && job.isStreamingJob())
&& (outputs == null || outputs.isEmpty())
&& (inputs == null || inputs.isEmpty());
}

@AllArgsConstructor
@NoArgsConstructor
@Setter
Expand Down
35 changes: 35 additions & 0 deletions api/src/test/java/marquez/service/LineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,41 @@ public void testGetLineageForRunningStreamingJob() {
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
}

@Test
public void testGetLineageForCompleteStreamingJob() {
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"RUNNING",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Arrays.asList(input),
Arrays.asList(output));

LineageTestUtils.createLineageRow(
openLineageDao,
"streamingjob",
"COMPLETE",
JobFacet.builder()
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
.build(),
Collections.emptyList(),
Collections.emptyList());

Lineage lineage =
lineageService.lineage(
NodeId.of(
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
5,
true);

assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
}

@Test
public void testLineageForOrphanedDataset() {
UpdateLineageRow writeJob =
Expand Down

0 comments on commit 0074433

Please sign in to comment.