Skip to content

Commit

Permalink
Persist OpenLineage event before updating marquez model (#2069)
Browse files Browse the repository at this point in the history
RunTransitionListener is invoked after updating the marquez model,
but OpenLineage event is not available at that moment. This simply
switch the order of it, that persists OpenLineage event first and
then updates marquez model. With this change, OpenLineage event is
already persisted and available from RunTransitionListener.

Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>

Signed-off-by: Minkyu Park <minkyu.park.200@gmail.com>
  • Loading branch information
fm100 authored Aug 12, 2022
1 parent 7c6ddd8 commit 626b844
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,6 @@ public OpenLineageService(BaseDao baseDao, RunService runService, Executor execu
}

public CompletableFuture<Void> createAsync(LineageEvent event) {
CompletableFuture<Void> marquez =
CompletableFuture.supplyAsync(
withSentry(withMdc(() -> updateMarquezModel(event, mapper))), executor)
.thenAccept(
(update) -> {
if (event.getEventType() != null) {
if (event.getEventType().equalsIgnoreCase("COMPLETE")) {
buildJobOutputUpdate(update).ifPresent(runService::notify);
}
buildJobInputUpdate(update).ifPresent(runService::notify);
}
});

UUID runUuid = runUuidFromEvent(event.getRun());
CompletableFuture<Void> openLineage =
CompletableFuture.runAsync(
Expand All @@ -94,6 +81,19 @@ public CompletableFuture<Void> createAsync(LineageEvent event) {
event.getProducer()))),
executor);

CompletableFuture<Void> marquez =
CompletableFuture.supplyAsync(
withSentry(withMdc(() -> updateMarquezModel(event, mapper))), executor)
.thenAccept(
(update) -> {
if (event.getEventType() != null) {
if (event.getEventType().equalsIgnoreCase("COMPLETE")) {
buildJobOutputUpdate(update).ifPresent(runService::notify);
}
buildJobInputUpdate(update).ifPresent(runService::notify);
}
});

return CompletableFuture.allOf(marquez, openLineage);
}

Expand Down

0 comments on commit 626b844

Please sign in to comment.