diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa3356906..e37062a68f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ *Save into Marquez model datasets sent via `DatasetEvent` event type * API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) *Save into Marquez model jobs and datasets sent via `JobEvent` event type. +* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes. ## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17 ### Added diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 23f7a9f118..b4be634f3f 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -280,6 +280,14 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { return findInputOrOutputDatasetsFor(jobVersionUuid, IoType.OUTPUT); } + /** + * Verifies if a job with a specified job version is present in table. + * + * @param version Version identifier + */ + @SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)") + boolean versionExists(UUID version); + /** * Returns the input or output datasets for a given job version. * @@ -447,98 +455,73 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco * and context. A version for a given job is created only when a {@link Run} transitions * into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state. * - * @param jobRow The job. - * @param runUuid The unique ID of the run associated with the job version. + * @param jobRowRunDetails The job row with run details. * @param runState The current run state. * @param transitionedAt The timestamp of the run state transition. * @return A {@link BagOfJobVersionInfo} object. */ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( - @NonNull JobRow jobRow, - @NonNull UUID runUuid, + @NonNull JobRowRunDetails jobRowRunDetails, @NonNull RunState runState, - @NonNull Instant transitionedAt) { + @NonNull Instant transitionedAt, + boolean linkJobToJobVersion) { // Get the job. final JobDao jobDao = createJobDao(); - // Get the inputs and outputs dataset versions for the run associated with the job version. - final DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); - final List jobVersionInputs = - datasetVersionDao.findInputDatasetVersionsFor(runUuid); - final List jobVersionOutputs = - datasetVersionDao.findOutputDatasetVersionsFor(runUuid); - - // Get the namespace for the job. - final NamespaceRow namespaceRow = - createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get(); - - // Generate the version for the job; the version may already exist. - final Version jobVersion = - Utils.newJobVersionFor( - NamespaceName.of(jobRow.getNamespaceName()), - JobName.of( - Optional.ofNullable(jobRow.getParentJobName()) - .map(pn -> pn + "." + jobRow.getSimpleName()) - .orElse(jobRow.getName())), - toDatasetIds( - jobVersionInputs.stream() - .map(i -> (DatasetVersionRow) i) - .collect(Collectors.toList())), - toDatasetIds( - jobVersionOutputs.stream() - .map(o -> (DatasetVersionRow) o) - .collect(Collectors.toList())), - jobRow.getLocation()); - // Add the job version. final JobVersionDao jobVersionDao = createJobVersionDao(); + final JobVersionRow jobVersionRow = jobVersionDao.upsertJobVersion( UUID.randomUUID(), transitionedAt, // Use the timestamp of when the run state transitioned. - jobRow.getUuid(), - jobRow.getLocation(), - jobVersion.getValue(), - jobRow.getName(), - namespaceRow.getUuid(), - jobRow.getNamespaceName()); + jobRowRunDetails.jobRow.getUuid(), + jobRowRunDetails.jobRow.getLocation(), + jobRowRunDetails.jobVersion.getValue(), + jobRowRunDetails.jobRow.getName(), + jobRowRunDetails.namespaceRow.getUuid(), + jobRowRunDetails.jobRow.getNamespaceName()); // Link the input datasets to the job version. - jobVersionInputs.forEach( + jobRowRunDetails.jobVersionInputs.forEach( jobVersionInput -> { jobVersionDao.upsertInputDatasetFor( jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid(), jobVersionRow.getJobUuid(), - jobRow.getSymlinkTargetId()); + jobRowRunDetails.jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. - jobVersionOutputs.forEach( + jobRowRunDetails.jobVersionOutputs.forEach( jobVersionOutput -> { jobVersionDao.upsertOutputDatasetFor( jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid(), jobVersionRow.getJobUuid(), - jobRow.getSymlinkTargetId()); + jobRowRunDetails.jobRow.getSymlinkTargetId()); }); // Link the job version to the run. - createRunDao().updateJobVersion(runUuid, jobVersionRow.getUuid()); + createRunDao().updateJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid()); // Link the run to the job version; multiple run instances may be linked to a job version. - jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid); + jobVersionDao.updateLatestRunFor( + jobVersionRow.getUuid(), transitionedAt, jobRowRunDetails.runUuid); // Link the job facets to this job version - jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid()); + jobVersionDao.linkJobFacetsToJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid()); - // Link the job version to the job only if the run is marked done and has transitioned into one - // of the following states: COMPLETED, ABORTED, or FAILED. - if (runState.isDone()) { - jobDao.updateVersionFor(jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid()); + if (linkJobToJobVersion) { + jobDao.updateVersionFor( + jobRowRunDetails.jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid()); } - return new BagOfJobVersionInfo(jobRow, jobVersionRow, jobVersionInputs, jobVersionOutputs); + return new BagOfJobVersionInfo( + jobRowRunDetails.jobRow, + jobVersionRow, + jobRowRunDetails.jobVersionInputs, + jobRowRunDetails.jobVersionOutputs); } /** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */ @@ -556,6 +539,40 @@ private DatasetId toDatasetId(DatasetVersionRow dataset) { NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName())); } + default JobRowRunDetails loadJobRowRunDetails(JobRow jobRow, UUID runUuid) { + // Get the inputs and outputs dataset versions for the run associated with the job version. + final DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); + final List jobVersionInputs = + datasetVersionDao.findInputDatasetVersionsFor(runUuid); + final List jobVersionOutputs = + datasetVersionDao.findOutputDatasetVersionsFor(runUuid); + + // Get the namespace for the job. + final NamespaceRow namespaceRow = + createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get(); + + // Generate the version for the job; the version may already exist. + final Version jobVersion = + Utils.newJobVersionFor( + NamespaceName.of(jobRow.getNamespaceName()), + JobName.of( + Optional.ofNullable(jobRow.getParentJobName()) + .map(pn -> pn + "." + jobRow.getSimpleName()) + .orElse(jobRow.getName())), + toDatasetIds( + jobVersionInputs.stream() + .map(i -> (DatasetVersionRow) i) + .collect(Collectors.toList())), + toDatasetIds( + jobVersionOutputs.stream() + .map(o -> (DatasetVersionRow) o) + .collect(Collectors.toList())), + jobRow.getLocation()); + + return new JobRowRunDetails( + jobRow, runUuid, namespaceRow, jobVersionInputs, jobVersionOutputs, jobVersion); + } + /** A container class for job version info. */ @Value class BagOfJobVersionInfo { @@ -567,6 +584,14 @@ class BagOfJobVersionInfo { record JobDataset(String namespace, String name, IoType ioType) {} + record JobRowRunDetails( + JobRow jobRow, + UUID runUuid, + NamespaceRow namespaceRow, + List jobVersionInputs, + List jobVersionOutputs, + Version jobVersion) {} + class JobDatasetMapper implements RowMapper { @Override public JobDataset map(ResultSet rs, StatementContext ctx) throws SQLException { diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index a1512e755d..2963f56ce2 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -27,13 +27,13 @@ import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.DatasetType; -import marquez.common.models.JobType; import marquez.common.models.NamespaceName; import marquez.common.models.RunState; import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; import marquez.db.JobVersionDao.IoType; +import marquez.db.JobVersionDao.JobRowRunDetails; import marquez.db.RunDao.RunUpsert; import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; @@ -167,9 +167,13 @@ SELECT count(*) default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) { UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper); RunState runState = getRunState(event.getEventType()); - if (event.getEventType() != null && runState.isDone()) { + + if (event.getJob() != null && event.getJob().isStreamingJob()) { + updateMarquezOnStreamingJob(event, updateLineageRow, runState); + } else if (event.getEventType() != null && runState.isDone()) { updateMarquezOnComplete(event, updateLineageRow, runState); } + return updateLineageRow; } @@ -559,7 +563,7 @@ private JobRow buildJobFromEvent( jobDao.upsertJob( UUID.randomUUID(), parent.getUuid(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -572,7 +576,7 @@ private JobRow buildJobFromEvent( () -> jobDao.upsertJob( UUID.randomUUID(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -680,7 +684,7 @@ private JobRow createParentJobRunRecord( createJobDao() .upsertJob( UUID.randomUUID(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -745,16 +749,58 @@ default Set toDatasetId(List datasets) { default void updateMarquezOnComplete( LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) { + final JobVersionDao jobVersionDao = createJobVersionDao(); + // Link the job version to the job only if the run is marked done and has transitioned into one + // of the following states: COMPLETED, ABORTED, or FAILED. + final boolean linkJobToJobVersion = runState.isDone(); + BagOfJobVersionInfo bagOfJobVersionInfo = - createJobVersionDao() - .upsertJobVersionOnRunTransition( - updateLineageRow.getJob(), - updateLineageRow.getRun().getUuid(), - runState, - event.getEventTime().toInstant()); + jobVersionDao.upsertJobVersionOnRunTransition( + jobVersionDao.loadJobRowRunDetails( + updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()), + runState, + event.getEventTime().toInstant(), + linkJobToJobVersion); updateLineageRow.setJobVersionBag(bagOfJobVersionInfo); } + /** + * A separate method is used as the logic to update Marquez model differs for streaming and batch. + * The assumption for batch is that the job version is created when task is done and cumulative + * list of input and output datasets from all the events is used to compute the job version UUID. + * However, this wouldn't make sense for streaming jobs, which are mostly long living and produce + * output before completing. + * + *

In this case, a job version is created based on the list of input and output datasets + * referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is + * created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then + * the union of all datasets registered within this job does not change, and thus job version does + * not get modified. In case of receiving another event with no inputs nor outputs, job version + * still will not get modified as its hash is evaluated based on the datasets attached to the run. + * + *

However, in case of event with inputs:{A,B,D} and outputs:{C}, new hash gets computed and + * new job version row is inserted into the table. + * + * @param event + * @param updateLineageRow + * @param runState + */ + default void updateMarquezOnStreamingJob( + LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) { + final JobVersionDao jobVersionDao = createJobVersionDao(); + JobRowRunDetails jobRowRunDetails = + jobVersionDao.loadJobRowRunDetails( + updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()); + + if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) { + // need to insert new job version + BagOfJobVersionInfo bagOfJobVersionInfo = + jobVersionDao.upsertJobVersionOnRunTransition( + jobRowRunDetails, runState, event.getEventTime().toInstant(), true); + updateLineageRow.setJobVersionBag(bagOfJobVersionInfo); + } + } + default String getUrlOrNull(String uri) { try { return new URI(uri).toASCIIString(); @@ -772,10 +818,6 @@ default String formatNamespaceName(String namespace) { return namespace.replaceAll("[^a-z:/A-Z0-9\\-_.@+]", "_"); } - default JobType getJobType(Job job) { - return JobType.BATCH; - } - default DatasetRecord upsertLineageDataset( ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) { daos.initBaseDao(this); diff --git a/api/src/main/java/marquez/db/mappers/JobMapper.java b/api/src/main/java/marquez/db/mappers/JobMapper.java index c331dc413b..1592f9247e 100644 --- a/api/src/main/java/marquez/db/mappers/JobMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobMapper.java @@ -15,9 +15,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashSet; +import java.util.Map; +import java.util.Optional; import java.util.Set; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -36,29 +41,35 @@ @Slf4j public final class JobMapper implements RowMapper { private static final ObjectMapper MAPPER = Utils.getMapper(); + private static final String JOB_TYPE_FACET_NAME = "jobType"; @Override public Job map(@NonNull ResultSet results, @NonNull StatementContext context) throws SQLException { - return new Job( - new JobId( - NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)), - JobName.of(stringOrThrow(results, Columns.NAME))), - JobType.valueOf(stringOrThrow(results, Columns.TYPE)), - JobName.of(stringOrThrow(results, Columns.NAME)), - stringOrThrow(results, Columns.SIMPLE_NAME), - stringOrNull(results, Columns.PARENT_JOB_NAME), - timestampOrThrow(results, Columns.CREATED_AT), - timestampOrThrow(results, Columns.UPDATED_AT), - getDatasetFromJsonOrNull(results, "current_inputs"), - new HashSet<>(), - urlOrNull(results, "current_location"), - stringOrNull(results, Columns.DESCRIPTION), - // Latest Run is resolved in the JobDao. This can be brought in via a join and - // and a jsonb but custom deserializers will need to be introduced - null, - toFacetsOrNull(results, Columns.FACETS), - uuidOrNull(results, Columns.CURRENT_VERSION_UUID)); + ImmutableMap facetsOrNull = toFacetsOrNull(results, Columns.FACETS); + Job job = + new Job( + new JobId( + NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)), + JobName.of(stringOrThrow(results, Columns.NAME))), + JobType.valueOf( + stringOrThrow(results, Columns.TYPE)), // TODO: store job type in a table + JobName.of(stringOrThrow(results, Columns.NAME)), + stringOrThrow(results, Columns.SIMPLE_NAME), + stringOrNull(results, Columns.PARENT_JOB_NAME), + timestampOrThrow(results, Columns.CREATED_AT), + timestampOrThrow(results, Columns.UPDATED_AT), + getDatasetFromJsonOrNull(results, "current_inputs"), + new HashSet<>(), + urlOrNull(results, "current_location"), + stringOrNull(results, Columns.DESCRIPTION), + // Latest Run is resolved in the JobDao. This can be brought in via a join and + // and a jsonb but custom deserializers will need to be introduced + null, + facetsOrNull, + uuidOrNull(results, Columns.CURRENT_VERSION_UUID), + getLabels(facetsOrNull)); + return job; } Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String column) @@ -78,4 +89,29 @@ Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String colum return new HashSet<>(); } } + + private ImmutableList getLabels(ImmutableMap facetsOrNull) { + Builder builder = ImmutableList.builder(); + + if (facetsOrNull == null) { + return builder.build(); + } + + Optional.ofNullable(getJobTypeFacetField(facetsOrNull, "jobType")) + .ifPresent(e -> builder.add(e)); + + Optional.ofNullable(getJobTypeFacetField(facetsOrNull, "integration")) + .ifPresent(e -> builder.add(e)); + + return builder.build(); + } + + private String getJobTypeFacetField(ImmutableMap facetsOrNull, String field) { + return Optional.ofNullable(facetsOrNull.get(JOB_TYPE_FACET_NAME)) + .filter(o -> o instanceof Map) + .map(m -> (Map) m) + .filter(m -> m.containsKey(field)) + .map(m -> m.get(field).toString()) + .orElse(null); + } } diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index 5c88ebe3ff..4f2ec6f09e 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -142,7 +142,11 @@ public CompletableFuture createAsync(LineageEvent event) { .thenAccept( (update) -> { if (event.getEventType() != null) { - if (event.getEventType().equalsIgnoreCase("COMPLETE")) { + boolean isStreaming = + Optional.ofNullable(event.getJob()) + .map(j -> j.isStreamingJob()) + .orElse(false); + if (event.getEventType().equalsIgnoreCase("COMPLETE") || isStreaming) { buildJobOutputUpdate(update).ifPresent(runService::notify); } buildJobInputUpdate(update).ifPresent(runService::notify); diff --git a/api/src/main/java/marquez/service/RunService.java b/api/src/main/java/marquez/service/RunService.java index 27144e1833..05c3957360 100644 --- a/api/src/main/java/marquez/service/RunService.java +++ b/api/src/main/java/marquez/service/RunService.java @@ -84,14 +84,14 @@ public void markRunAs( runStateDao.updateRunStateFor(runId.getValue(), runState, transitionedAt); if (runState.isDone()) { + JobRow jobRow = + jobDao.findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName()).orElseThrow(); BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - jobDao - .findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName()) - .orElseThrow(), - runRow.getUuid(), + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), runState, - transitionedAt); + transitionedAt, + true); // TODO: We should also notify that the outputs have been updated when a run is in a done // state to be consistent with existing job versioning logic. We'll want to add testing to diff --git a/api/src/main/java/marquez/service/models/Job.java b/api/src/main/java/marquez/service/models/Job.java index 4ed2d3fbd4..bcdaee1bfb 100644 --- a/api/src/main/java/marquez/service/models/Job.java +++ b/api/src/main/java/marquez/service/models/Job.java @@ -5,6 +5,7 @@ package marquez.service.models; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.net.URL; import java.time.Instant; @@ -41,6 +42,7 @@ public final class Job { @Nullable @Setter private Run latestRun; @Getter private final ImmutableMap facets; @Nullable private UUID currentVersion; + @Getter @Nullable private ImmutableList labels; public Job( @NonNull final JobId id, @@ -56,7 +58,8 @@ public Job( @Nullable final String description, @Nullable final Run latestRun, @Nullable final ImmutableMap facets, - @Nullable UUID currentVersion) { + @Nullable UUID currentVersion, + @Nullable ImmutableList labels) { this.id = id; this.type = type; this.name = name; @@ -72,6 +75,7 @@ public Job( this.latestRun = latestRun; this.facets = (facets == null) ? ImmutableMap.of() : facets; this.currentVersion = currentVersion; + this.labels = (labels == null) ? ImmutableList.of() : labels; } public Optional getLocation() { diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 6117e1fc3d..05820cd53d 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -15,6 +15,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.annotation.Nullable; import javax.validation.Valid; import javax.validation.constraints.NotNull; @@ -24,6 +25,7 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +import marquez.common.models.JobType; /** * Requires jackson serialization features: mapper.registerModule(new JavaTimeModule()); @@ -206,6 +208,25 @@ public static class Job extends BaseJsonModel { @NotNull private String namespace; @NotNull private String name; @Valid private JobFacet facets; + + /** + * Verifies if a job is a streaming job. + * + * @return + */ + @JsonIgnore + public boolean isStreamingJob() { + return Optional.ofNullable(this.facets) + .map(JobFacet::getJobType) + .map(JobTypeJobFacet::getProcessingType) + .filter(type -> type.equalsIgnoreCase("STREAMING")) + .isPresent(); + } + + @JsonIgnore + public JobType type() { + return isStreamingJob() ? JobType.STREAM : JobType.BATCH; + } } @Builder @@ -214,12 +235,13 @@ public static class Job extends BaseJsonModel { @Setter @Valid @ToString - @JsonPropertyOrder({"documentation", "sourceCodeLocation", "sql", "description"}) + @JsonPropertyOrder({"documentation", "sourceCodeLocation", "sql", "description", "jobType"}) public static class JobFacet { @Valid private DocumentationJobFacet documentation; @Valid private SourceCodeLocationJobFacet sourceCodeLocation; @Valid private SQLJobFacet sql; + @Valid private JobTypeJobFacet jobType; @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); @JsonAnySetter @@ -240,6 +262,10 @@ public SourceCodeLocationJobFacet getSourceCodeLocation() { return sourceCodeLocation; } + public JobTypeJobFacet getJobType() { + return jobType; + } + public SQLJobFacet getSql() { return sql; } @@ -297,6 +323,31 @@ public SQLJobFacet(@NotNull URI _producer, @NotNull URI _schemaURL, @NotNull Str } } + @NoArgsConstructor + @Getter + @Setter + @Valid + @ToString + public static class JobTypeJobFacet extends BaseFacet { + + @NotNull private String processingType; + @NotNull private String integration; + @NotNull private String jobType; + + @Builder + public JobTypeJobFacet( + @NotNull URI _producer, + @NotNull URI _schemaURL, + @NotNull String processingType, + @NotNull String integration, + @NotNull String jobType) { + super(_producer, _schemaURL); + this.processingType = processingType; + this.integration = integration; + this.jobType = jobType; + } + } + @Builder @AllArgsConstructor @NoArgsConstructor diff --git a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java index c161c2e672..bca7686e73 100644 --- a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java +++ b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java @@ -23,6 +23,7 @@ import marquez.db.OpenLineageDao; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,8 +38,7 @@ public class ColumnLineageIntegrationTest extends BaseIntegrationTest { public void setup(Jdbi jdbi) { OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class); - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); LineageEvent.Dataset dataset_A = getDatasetA(); LineageEvent.Dataset dataset_B = getDatasetB(); diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 16dd9ccfed..fe3509b663 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -115,9 +115,7 @@ INSERT INTO job_versions (uuid, created_at, updated_at, job_uuid, version, locat nominalTimeRunFacet, parentRun.orElse(null), ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))))) - .job( - new LineageEvent.Job( - NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) + .job(new LineageEvent.Job(NAMESPACE, jobName, JobFacet.builder().build())) .inputs( Collections.singletonList( new LineageEvent.Dataset( diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java index 571704c5c3..813b07b29a 100644 --- a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -14,6 +14,7 @@ import marquez.api.JdbiUtils; import marquez.db.models.UpdateLineageRow; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; public class ColumnLineageTestUtils { @@ -110,8 +111,7 @@ public static LineageEvent.Dataset getDatasetC() { public static UpdateLineageRow createLineage( OpenLineageDao openLineageDao, LineageEvent.Dataset input, LineageEvent.Dataset output) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/db/DatasetDaoTest.java b/api/src/test/java/marquez/db/DatasetDaoTest.java index 930cf1a4b4..b27324c343 100644 --- a/api/src/test/java/marquez/db/DatasetDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetDaoTest.java @@ -41,7 +41,7 @@ class DatasetDaoTest { private static DatasetDao datasetDao; private static OpenLineageDao openLineageDao; - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index 6a4e9e0ed3..01d3c3bd19 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -20,6 +20,7 @@ import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -308,8 +309,7 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() { @Test public void testInsertOutputDatasetFacetsFor() { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( @@ -340,8 +340,7 @@ public void testInsertOutputDatasetFacetsFor() { @Test public void testInsertInputDatasetFacetsFor() { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( @@ -372,8 +371,7 @@ public void testInsertInputDatasetFacetsFor() { private UpdateLineageRow createLineageRowWithInputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, @@ -389,8 +387,7 @@ private UpdateLineageRow createLineageRowWithInputDataset( private UpdateLineageRow createLineageRowWithOutputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder outputDatasetFacetsbuilder) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/FacetTestUtils.java b/api/src/test/java/marquez/db/FacetTestUtils.java index 5e78e9b689..ea516ab679 100644 --- a/api/src/test/java/marquez/db/FacetTestUtils.java +++ b/api/src/test/java/marquez/db/FacetTestUtils.java @@ -9,6 +9,7 @@ import java.util.UUID; import marquez.db.models.UpdateLineageRow; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.apache.commons.lang3.StringUtils; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -20,14 +21,20 @@ public class FacetTestUtils { public static UpdateLineageRow createLineageWithFacets(OpenLineageDao openLineageDao) { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - Map.of( - "ownership", "some-owner", - "sourceCode", "some-code")); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .additional( + Map.of( + "ownership", "some-owner", + "sourceCode", "some-code")) + .build(); + return LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/db/JobFacetsDaoTest.java b/api/src/test/java/marquez/db/JobFacetsDaoTest.java index e6b48e7d7d..91a60c1157 100644 --- a/api/src/test/java/marquez/db/JobFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/JobFacetsDaoTest.java @@ -18,6 +18,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -63,12 +64,14 @@ public void insertJobFacets() { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new LineageEvent.JobFacet( - null, - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - null, - LineageTestUtils.EMPTY_MAP), + JobFacet.builder() + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, + SCHEMA_URL, + "git", + "git@github.com:OpenLineage/OpenLineage.git")) + .build(), Collections.emptyList(), Collections.emptyList()); @@ -88,12 +91,16 @@ public void insertJobFacets() { @Test public void testGetFacetsByRunUuid() { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - null); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .build(); + UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index aa0caaf4fc..b2d3a6b91a 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -242,7 +242,10 @@ public void testGetJobVersions() { jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); List jobVersions = jobVersionDao.findAllJobVersions(namespaceRow.getName(), jobRow.getName(), 10, 0); @@ -311,7 +314,10 @@ public void testUpsertJobVersionOnRunTransition() { // (6) Add a new job version on the run state transition to COMPLETED. final BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + newTimestamp(), + true); // Ensure the job version is associated with the latest run. final RunRow latestRunRowForJobVersion = runDao.findRunByUuidAsRow(runRow.getUuid()).get(); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index aed23901a8..006275508f 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -67,7 +67,7 @@ public class LineageDaoTest { new SchemaField("firstname", "string", "the first name"), new SchemaField("lastname", "string", "the last name"), new SchemaField("birthdate", "date", "the date of birth"))); - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; @@ -393,7 +393,7 @@ public void testGetLineageWithJobThatSharesNoDatasets() { /** A failed consumer job doesn't show up in the datasets out edges */ @Test public void testGetLineageWithFailedConsumer() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -513,7 +513,7 @@ public void testGetInputDatasetsWithJobThatHasMultipleVersions() { /** A failed producer job doesn't show up in the lineage */ @Test public void testGetLineageWithFailedProducer() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -542,7 +542,7 @@ public void testGetLineageWithFailedProducer() { /** A failed producer job doesn't show up in the lineage */ @Test public void testGetLineageChangedJobVersion() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -568,7 +568,7 @@ public void testGetLineageChangedJobVersion() { @Test public void testGetJobFromInputOrOutput() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -592,7 +592,7 @@ public void testGetJobFromInputOrOutput() { @Test public void testGetJobFromInputOrOutputPrefersRecentOutputJob() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); // add some consumer jobs prior to the write so we know that the sort isn't simply picking // the first job created diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index e21a1070f3..aaca26b6b0 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -73,7 +73,7 @@ public static void setUpOnce(Jdbi jdbi) { /** When reading a dataset, the version is assumed to be the version last written */ @Test void testUpdateMarquezModel() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -117,11 +117,9 @@ void testUpdateMarquezModelWithDatasetEvent() { @Test void testUpdateMarquezModelWithJobEvent() { JobFacet jobFacet = - new JobFacet( - DocumentationJobFacet.builder().description("documentation").build(), - null, - null, - LineageTestUtils.EMPTY_MAP); + JobFacet.builder() + .documentation(DocumentationJobFacet.builder().description("documentation").build()) + .build(); Job job = new Job(NAMESPACE, READ_JOB_NAME, jobFacet); @@ -183,7 +181,7 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { PRODUCER_URL, SCHEMA_URL, "TRUNCATE")) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList(dataset)); @@ -195,7 +193,7 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { @Test void testUpdateMarquezModelDatasetWithColumnLineageFacet() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -247,7 +245,7 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() { @Test void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExist() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -283,7 +281,7 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotEx TRANSFORMATION_TYPE))))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -334,7 +332,7 @@ void testUpsertColumnLineageData() { UPDATED_TRANSFORMATION_TYPE))))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob1 = LineageTestUtils.createLineageRow( dao, @@ -385,7 +383,7 @@ void testUpdateMarquezModelDatasetWithSymlinks() { "symlinkNamespace", "symlinkName", "some-type")))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList(dataset)); @@ -425,7 +423,7 @@ void testUpdateMarquezModelDatasetWithSymlinks() { */ @Test void testUpdateMarquezModelWithInputOnlyDataset() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -449,7 +447,7 @@ void testUpdateMarquezModelWithInputOnlyDataset() { */ @Test void testUpdateMarquezModelWithNonMatchingReadSchema() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -496,7 +494,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { */ @Test void testUpdateMarquezModelWithPriorWrites() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob1 = LineageTestUtils.createLineageRow( dao, @@ -569,7 +567,7 @@ void testUpdateMarquezModelWithPriorWrites() { @Test void testGetOpenLineageEvents() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -590,7 +588,7 @@ void testGetOpenLineageEvents() { @Test void testInputOutputDatasetFacets() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( dao, diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 9eb1fc2bcd..ce6d9f5329 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -83,7 +83,10 @@ public void getRun() { jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); Optional run = runDao.findRunByUuid(runRow.getUuid()); assertThat(run) @@ -198,7 +201,10 @@ private Stream createRunsForJob( jdbi, runRow.getUuid(), RunState.COMPLETED, outputs); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); return runRow; }); } diff --git a/api/src/test/java/marquez/db/RunFacetsDaoTest.java b/api/src/test/java/marquez/db/RunFacetsDaoTest.java index 9dfa40c85a..965423a80d 100644 --- a/api/src/test/java/marquez/db/RunFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/RunFacetsDaoTest.java @@ -58,7 +58,7 @@ public void setup(Jdbi jdbi) { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + JobFacet.builder().build(), Collections.emptyList(), Collections.emptyList(), new LineageEvent.ParentRunFacet( @@ -112,7 +112,7 @@ public void testInsertRunFacetsForSparkLogicalPlanWhenPlanAlreadyPresent() { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + JobFacet.builder().build(), Collections.emptyList(), Collections.emptyList(), new ParentRunFacet( @@ -164,12 +164,16 @@ public void testInsertRunFacetsForCustomFacet() { @Test public void testGetFacetsByRunUuid() { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - null); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .build(); + UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/mappers/JobMapperTest.java b/api/src/test/java/marquez/db/mappers/JobMapperTest.java index 6dc8bed3f7..b88941761f 100644 --- a/api/src/test/java/marquez/db/mappers/JobMapperTest.java +++ b/api/src/test/java/marquez/db/mappers/JobMapperTest.java @@ -18,6 +18,7 @@ import java.util.TimeZone; import java.util.UUID; import marquez.common.Utils; +import marquez.common.models.JobType; import marquez.db.Columns; import marquez.service.models.Job; import org.jdbi.v3.core.statement.StatementContext; @@ -31,6 +32,11 @@ class JobMapperTest { private static ResultSet resultSet; private static TimeZone defaultTZ = TimeZone.getDefault(); + private static String JOB_FACET = + """ + [{"jobType": {"jobType": "QUERY", "integration": "FLINK", "processingType": "STREAMING"}}] + """; + @BeforeAll public static void setUp() throws SQLException, MalformedURLException { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); @@ -87,4 +93,25 @@ void shouldMapFullJob() throws SQLException { Job actual = underTest.map(resultSet, mock(StatementContext.class)); assertThat(actual).isEqualTo(expected); } + + @Test + void testMapJobTypeJobFacet() throws SQLException { + ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class); + + when(resultSet.getString(Columns.TYPE)).thenReturn("STREAM"); + when(resultSet.getObject(Columns.TYPE)).thenReturn("STREAM"); + + when(resultSet.getMetaData()).thenReturn(resultSetMetaData); + when(resultSetMetaData.getColumnCount()).thenReturn(1); + when(resultSetMetaData.getColumnName(1)).thenReturn(Columns.FACETS); + + when(resultSet.getString(Columns.FACETS)).thenReturn(JOB_FACET); + when(resultSet.getObject(Columns.FACETS)).thenReturn(JOB_FACET); + JobMapper underTest = new JobMapper(); + + Job actual = underTest.map(resultSet, mock(StatementContext.class)); + + assertThat(actual.getType()).isEqualTo(JobType.STREAM); + assertThat(actual.getLabels()).containsExactly("QUERY", "FLINK"); + } } diff --git a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java index b5f98d12e4..adcfa95552 100644 --- a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java +++ b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java @@ -28,6 +28,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.flywaydb.core.api.migration.Context; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; @@ -167,8 +168,7 @@ public void testMigrateForMultipleChunks() throws Exception { @Test public void testMigrateForLineageWithNoDatasets() throws Exception { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = LineageEvent.JobFacet.builder().build(); LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 7f5634e6a5..62d7355f6f 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -41,6 +41,7 @@ import marquez.service.models.Dataset; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.Node; import marquez.service.models.NodeId; import org.jdbi.v3.core.Jdbi; @@ -70,7 +71,7 @@ public static void setUpOnce(Jdbi jdbi) { fieldDao = jdbi.onDemand(DatasetFieldDao.class); datasetDao = jdbi.onDemand(DatasetDao.class); lineageService = new ColumnLineageService(dao, fieldDao); - jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + jobFacet = JobFacet.builder().build(); } @AfterEach diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 97f6b9ce02..9366bbb8b9 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -12,11 +12,13 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import marquez.api.JdbiUtils; +import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobId; @@ -39,6 +41,7 @@ import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobTypeJobFacet; import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Node; import marquez.service.models.NodeId; @@ -71,8 +74,7 @@ public class LineageServiceTest { new SchemaField("firstname", "string", "the first name"), new SchemaField("lastname", "string", "the last name"), new SchemaField("birthdate", "date", "the date of birth"))); - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); - + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; @BeforeAll @@ -425,6 +427,59 @@ public void testLineageWithWithCycle() { .matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob")); } + @Test + public void testGetLineageForRunningStreamingJob() { + Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build(); + Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build(); + + // (1) Run batch job which outputs input-dataset + LineageTestUtils.createLineageRow( + openLineageDao, + "someInputBatchJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(input)); + + // (2) Run streaming job on the reading output of this job + LineageTestUtils.createLineageRow( + openLineageDao, + "streamingjob", + "RUNNING", + JobFacet.builder() + .jobType(JobTypeJobFacet.builder().processingType("STREAMING").build()) + .build(), + Arrays.asList(input), + Arrays.asList(output)); + + // (3) Run batch job that reads output of streaming job (Note: streaming job is still running) + LineageTestUtils.createLineageRow( + openLineageDao, + "someOutputBatchJob", + "COMPLETE", + jobFacet, + Arrays.asList(output), + Collections.emptyList()); + + // (4) lineage on output dataset shall be same as lineage on input dataset + Lineage lineageFromInput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))), + 5, + true); + + Lineage lineageFromOutput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))), + 5, + true); + + assertThat(lineageFromInput.getGraph()).hasSize(5); // 2 datasets + 3 jobs + assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph()); + } + @Test public void testLineageForOrphanedDataset() { UpdateLineageRow writeJob = diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index d80ba94b00..dfef90fa29 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -36,6 +36,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetVersionDao; import marquez.db.JobDao; +import marquez.db.JobVersionDao; import marquez.db.NamespaceDao; import marquez.db.OpenLineageDao; import marquez.db.RunArgsDao; @@ -57,6 +58,8 @@ import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.DatasourceDatasetFacet; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobTypeJobFacet; +import marquez.service.models.LineageEvent.LineageEventBuilder; import marquez.service.models.LineageEvent.RunFacet; import marquez.service.models.LineageEvent.SQLJobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; @@ -84,8 +87,9 @@ public class OpenLineageServiceIntegrationTest { private JobService jobService; private OpenLineageDao openLineageDao; - + private Jdbi jdbi; private JobDao jobDao; + private JobVersionDao jobVersionDao; private DatasetDao datasetDao; private DatasetVersionDao datasetVersionDao; private ArgumentCaptor runInputListener; @@ -148,9 +152,11 @@ public ExpectedResults( @BeforeEach public void setup(Jdbi jdbi) throws SQLException { + this.jdbi = jdbi; openLineageDao = jdbi.onDemand(OpenLineageDao.class); datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class); jobDao = jdbi.onDemand(JobDao.class); + jobVersionDao = jdbi.onDemand(JobVersionDao.class); runService = mock(RunService.class); jobService = new JobService(jobDao, runService); runInputListener = ArgumentCaptor.forClass(JobInputUpdate.class); @@ -548,6 +554,172 @@ void testJobEvent() throws ExecutionException, InterruptedException { .isEqualTo("theDatasource"); } + @Test + void testStreamingJob() throws ExecutionException, InterruptedException { + // (1) Create output + LineageEvent.Dataset input = + LineageEvent.Dataset.builder().name(DATASET_NAME).namespace(NAMESPACE).build(); + + LineageEvent.Dataset output = + LineageEvent.Dataset.builder() + .name(DATASET_NAME) + .namespace(NAMESPACE) + .facets( + DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField("col", "STRING", "my name")))) + .build()) + .build(); + + // (2) Streaming job not followed by a COMPLETE event writing to a output + UUID firstRunId = UUID.randomUUID(); + lineageService + .createAsync( + LineageEvent.builder() + .eventType("RUNNING") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType( + JobTypeJobFacet.builder() + .processingType("STREAMING") + .integration("FLINK") + .jobType("JOB") + .build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)) + .inputs(Collections.singletonList(input)) + .outputs(Collections.singletonList(output)) + .build()) + .get(); + Optional datasetRow = datasetDao.findDatasetByName(NAMESPACE, DATASET_NAME); + + // (3) Assert that output is present and has dataset_version written + assertThat(datasetRow).isPresent().flatMap(Dataset::getCurrentVersion).isPresent(); + + // (4) Assert that job is present and its current version is present + Job job = jobDao.findJobByName(NAMESPACE, "streaming_job_name").get(); + assertThat(job.getInputs()).hasSize(1); + assertThat(job.getCurrentVersion()).isPresent(); + assertThat(job.getType()).isEqualTo(JobType.STREAM); + assertThat(job.getLabels()).containsExactly("JOB", "FLINK"); + + UUID initialJobVersion = job.getCurrentVersion().get(); + Instant updatedAt = + jdbi.withHandle( + h -> + h.createQuery("SELECT max(updated_at) FROM job_versions") + .mapTo(Instant.class) + .first()); + + // (5) Send COMPLETE event streaming job + lineageService + .createAsync( + LineageEvent.builder() + .eventType("COMPLETE") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType( + JobTypeJobFacet.builder() + .processingType("STREAMING") + .integration("FLINK") + .jobType("JOB") + .build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)) + .inputs(Collections.emptyList()) + .outputs(Collections.emptyList()) + .build()) + .get(); + + // (6) Assert job version exists + job = jobDao.findJobByName(NAMESPACE, "streaming_job_name").get(); + assertThat(job.getCurrentVersion()).isPresent(); + assertThat(job.getType()).isEqualTo(JobType.STREAM); + assertThat(job.getLabels()).containsExactly("JOB", "FLINK"); + assertThat(job.getCurrentVersion().get()).isEqualTo(initialJobVersion); + + // (7) Make sure updated_at in job version did not change + Instant lastUpdatedAt = + jdbi.withHandle( + h -> + h.createQuery("SELECT max(updated_at) FROM job_versions") + .mapTo(Instant.class) + .first()); + assertThat(updatedAt).isEqualTo(lastUpdatedAt); + } + + @Test + void testStreamingJobCreateSingleJobAndDatasetVersion() + throws ExecutionException, InterruptedException { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().name(DATASET_NAME).namespace(NAMESPACE).build(); + UUID firstRunId = UUID.randomUUID(); + LineageEventBuilder eventBuilder = + LineageEvent.builder() + .eventType("RUNNING") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType(JobTypeJobFacet.builder().processingType("STREAMING").build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)); + LineageEvent lineageEvent = eventBuilder.outputs(Collections.singletonList(dataset)).build(); + + // (1) Emit running event + lineageService.createAsync(lineageEvent).get(); + + UUID datasetVersionUuid = + datasetDao.findDatasetAsRow(NAMESPACE, DATASET_NAME).get().getCurrentVersionUuid().get(); + int initialJobVersionsCount = + jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size(); + + // (2) Emit other running event + lineageService.createAsync(lineageEvent).get(); + + // (3) Emit running event with no input nor output datasets + lineageService.createAsync(eventBuilder.build()).get(); + assertThat(jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size()) + .isEqualTo(initialJobVersionsCount); + + // (4) Emit event with other input dataset + LineageEvent.Dataset otherdataset = + LineageEvent.Dataset.builder().name("otherDataset").namespace(NAMESPACE).build(); + lineageService + .createAsync(eventBuilder.inputs(Collections.singletonList(otherdataset)).build()) + .get(); + assertThat(jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size()) + .isEqualTo(initialJobVersionsCount + 1); + + // (5) Verify dataset's version has not changed + assertThat( + datasetDao + .findDatasetAsRow(NAMESPACE, DATASET_NAME) + .get() + .getCurrentVersionUuid() + .get()) + .isEqualTo(datasetVersionUuid); + } + private void checkExists(LineageEvent.Dataset ds) { DatasetService datasetService = new DatasetService(openLineageDao, runService); diff --git a/api/src/test/java/marquez/service/models/LineageEventTest.java b/api/src/test/java/marquez/service/models/LineageEventTest.java index b06fe2946d..a78bc98f57 100644 --- a/api/src/test/java/marquez/service/models/LineageEventTest.java +++ b/api/src/test/java/marquez/service/models/LineageEventTest.java @@ -24,6 +24,8 @@ import java.util.List; import marquez.common.Utils; import marquez.common.models.FlexibleDateTimeDeserializer; +import marquez.service.models.LineageEvent.JobTypeJobFacet; +import org.junit.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -95,4 +97,16 @@ public void testSerialization(ObjectMapper mapper, String expectedFile) throws I JsonNode actualNode = mapper.readTree(serialized); assertThat(actualNode).isEqualTo(expectedNode); } + + @Test + public void testJobTypeJobFacetSerialization() throws IOException { + URL expectedResource = Resources.getResource(EVENT_FULL); + LineageEvent deserialized = + (LineageEvent) Utils.newObjectMapper().readValue(expectedResource, BaseEvent.class); + JobTypeJobFacet facet = deserialized.getJob().getFacets().getJobType(); + + assertThat(facet.getJobType()).isEqualTo("QUERY"); + assertThat(facet.getIntegration()).isEqualTo("FLINK"); + assertThat(facet.getProcessingType()).isEqualTo("STREAMING"); + } } diff --git a/api/src/test/resources/open_lineage/event_full.json b/api/src/test/resources/open_lineage/event_full.json index 6aa5109b97..2aab96d3b6 100644 --- a/api/src/test/resources/open_lineage/event_full.json +++ b/api/src/test/resources/open_lineage/event_full.json @@ -61,6 +61,13 @@ }, "query": "SELECT * FROM foo" }, + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet", + "jobType": "QUERY", + "integration": "FLINK", + "processingType": "STREAMING" + }, "additionalProp1": { "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet",