Skip to content

Commit

Permalink
refactor isStreamingJob method
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Dec 13, 2023
1 parent 51ada5d commit 93922db
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 34 deletions.
18 changes: 0 additions & 18 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Job;
import marquez.service.models.LineageEvent.JobFacet;
import marquez.service.models.LineageEvent.JobTypeJobFacet;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;
Expand Down Expand Up @@ -316,21 +313,6 @@ public static Version newDatasetVersionFor(
return newDatasetVersionFor(data);
}

/**
* Verifies if a job is a streaming job.
*
* @param job
* @return
*/
public static boolean isStreamingJob(Job job) {
return Optional.ofNullable(job)
.map(Job::getFacets)
.map(JobFacet::getJobType)
.map(JobTypeJobFacet::getProcessingType)
.filter(type -> type.equalsIgnoreCase("STREAMING"))
.isPresent();
}

/**
* Returns a new {@link Version} object based on the dataset namespace, dataset name dataset meta
* information. A {@link Version} is generated by concatenating the provided job meta together
Expand Down
13 changes: 4 additions & 9 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.DatasetType;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunState;
import marquez.common.models.SourceType;
Expand Down Expand Up @@ -169,7 +168,7 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());

if (Utils.isStreamingJob(event.getJob())) {
if (event.getJob() != null && event.getJob().isStreamingJob()) {
updateMarquezOnStreamingJob(event, updateLineageRow, runState);
} else if (event.getEventType() != null && runState.isDone()) {
updateMarquezOnComplete(event, updateLineageRow, runState);
Expand Down Expand Up @@ -564,7 +563,7 @@ private JobRow buildJobFromEvent(
jobDao.upsertJob(
UUID.randomUUID(),
parent.getUuid(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand All @@ -577,7 +576,7 @@ private JobRow buildJobFromEvent(
() ->
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand Down Expand Up @@ -685,7 +684,7 @@ private JobRow createParentJobRunRecord(
createJobDao()
.upsertJob(
UUID.randomUUID(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand Down Expand Up @@ -819,10 +818,6 @@ default String formatNamespaceName(String namespace) {
return namespace.replaceAll("[^a-z:/A-Z0-9\\-_.@+]", "_");
}

default JobType getJobType(Job job) {
return Utils.isStreamingJob(job) ? JobType.STREAM : JobType.BATCH;
}

default DatasetRecord upsertLineageDataset(
ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) {
daos.initBaseDao(this);
Expand Down
9 changes: 2 additions & 7 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,8 @@ public CompletableFuture<Void> createAsync(LineageEvent event) {
if (event.getEventType() != null) {
boolean isStreaming =
Optional.ofNullable(event.getJob())
.map(j -> j.getFacets())
.map(f -> f.getJobType())
.map(p -> p.getProcessingType())
.filter(t -> t.equalsIgnoreCase("STREAMING"))
.stream()
.findAny()
.isPresent();
.map(j -> j.isStreamingJob())
.orElse(false);
if (event.getEventType().equalsIgnoreCase("COMPLETE") || isStreaming) {
buildJobOutputUpdate(update).ifPresent(runService::notify);
}
Expand Down
21 changes: 21 additions & 0 deletions api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
Expand All @@ -24,6 +25,7 @@
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import marquez.common.models.JobType;

/**
* Requires jackson serialization features: mapper.registerModule(new JavaTimeModule());
Expand Down Expand Up @@ -206,6 +208,25 @@ public static class Job extends BaseJsonModel {
@NotNull private String namespace;
@NotNull private String name;
@Valid private JobFacet facets;

/**
* Verifies if a job is a streaming job.
*
* @return
*/
@JsonIgnore
public boolean isStreamingJob() {
return Optional.ofNullable(this.facets)
.map(JobFacet::getJobType)
.map(JobTypeJobFacet::getProcessingType)
.filter(type -> type.equalsIgnoreCase("STREAMING"))
.isPresent();
}

@JsonIgnore
public JobType type() {
return isStreamingJob() ? JobType.STREAM : JobType.BATCH;
}
}

@Builder
Expand Down

0 comments on commit 93922db

Please sign in to comment.