Skip to content

Commit

Permalink
write job versions on running
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Dec 5, 2023
1 parent 3a85d9d commit 9d2eea8
Show file tree
Hide file tree
Showing 16 changed files with 449 additions and 178 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*Save into Marquez model datasets sent via `DatasetEvent` event type
* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model jobs and datasets sent via `JobEvent` event type.
* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes.

## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17
### Added
Expand Down
18 changes: 18 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Job;
import marquez.service.models.LineageEvent.JobFacet;
import marquez.service.models.LineageEvent.JobTypeJobFacet;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;
Expand Down Expand Up @@ -313,6 +316,21 @@ public static Version newDatasetVersionFor(
return newDatasetVersionFor(data);
}

/**
* Verifies if a job is a streaming job.
*
* @param job
* @return
*/
public static boolean isStreamingJob(Job job) {
return Optional.ofNullable(job)
.map(Job::getFacets)
.map(JobFacet::getJobType)
.map(JobTypeJobFacet::getProcessingType)
.filter(type -> type.equalsIgnoreCase("STREAMING"))
.isPresent();
}

/**
* Returns a new {@link Version} object based on the dataset namespace, dataset name dataset meta
* information. A {@link Version} is generated by concatenating the provided job meta together
Expand Down
129 changes: 77 additions & 52 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
return findInputOrOutputDatasetsFor(jobVersionUuid, IoType.OUTPUT);
}

/**
* Verifies if a job with a specified job version is present in table.
*
* @param version Version identifier
*/
@SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)")
boolean versionExists(UUID version);

/**
* Returns the input or output datasets for a given job version.
*
Expand Down Expand Up @@ -447,98 +455,73 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
* and context. A version for a given job is created <i>only</i> when a {@link Run} transitions
* into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param jobRowRunDetails The job row with run details.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull JobRowRunDetails jobRowRunDetails,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
@NonNull Instant transitionedAt,
boolean linkJobToJobVersion) {
// Get the job.
final JobDao jobDao = createJobDao();

// Get the inputs and outputs dataset versions for the run associated with the job version.
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
final List<ExtendedDatasetVersionRow> jobVersionInputs =
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);

// Get the namespace for the job.
final NamespaceRow namespaceRow =
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();

// Generate the version for the job; the version may already exist.
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(
jobVersionInputs.stream()
.map(i -> (DatasetVersionRow) i)
.collect(Collectors.toList())),
toDatasetIds(
jobVersionOutputs.stream()
.map(o -> (DatasetVersionRow) o)
.collect(Collectors.toList())),
jobRow.getLocation());

// Add the job version.
final JobVersionDao jobVersionDao = createJobVersionDao();

final JobVersionRow jobVersionRow =
jobVersionDao.upsertJobVersion(
UUID.randomUUID(),
transitionedAt, // Use the timestamp of when the run state transitioned.
jobRow.getUuid(),
jobRow.getLocation(),
jobVersion.getValue(),
jobRow.getName(),
namespaceRow.getUuid(),
jobRow.getNamespaceName());
jobRowRunDetails.jobRow.getUuid(),
jobRowRunDetails.jobRow.getLocation(),
jobRowRunDetails.jobVersion.getValue(),
jobRowRunDetails.jobRow.getName(),
jobRowRunDetails.namespaceRow.getUuid(),
jobRowRunDetails.jobRow.getNamespaceName());

// Link the input datasets to the job version.
jobVersionInputs.forEach(
jobRowRunDetails.jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
jobRowRunDetails.jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobRowRunDetails.jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
jobRowRunDetails.jobRow.getSymlinkTargetId());
});

// Link the job version to the run.
createRunDao().updateJobVersion(runUuid, jobVersionRow.getUuid());
createRunDao().updateJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());

// Link the run to the job version; multiple run instances may be linked to a job version.
jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid);
jobVersionDao.updateLatestRunFor(
jobVersionRow.getUuid(), transitionedAt, jobRowRunDetails.runUuid);

// Link the job facets to this job version
jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid());
jobVersionDao.linkJobFacetsToJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());

// Link the job version to the job only if the run is marked done and has transitioned into one
// of the following states: COMPLETED, ABORTED, or FAILED.
if (runState.isDone()) {
jobDao.updateVersionFor(jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
if (linkJobToJobVersion) {
jobDao.updateVersionFor(
jobRowRunDetails.jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
}

return new BagOfJobVersionInfo(jobRow, jobVersionRow, jobVersionInputs, jobVersionOutputs);
return new BagOfJobVersionInfo(
jobRowRunDetails.jobRow,
jobVersionRow,
jobRowRunDetails.jobVersionInputs,
jobRowRunDetails.jobVersionOutputs);
}

/** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */
Expand All @@ -556,6 +539,40 @@ private DatasetId toDatasetId(DatasetVersionRow dataset) {
NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName()));
}

default JobRowRunDetails loadJobRowRunDetails(JobRow jobRow, UUID runUuid) {
// Get the inputs and outputs dataset versions for the run associated with the job version.
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
final List<ExtendedDatasetVersionRow> jobVersionInputs =
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);

// Get the namespace for the job.
final NamespaceRow namespaceRow =
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();

// Generate the version for the job; the version may already exist.
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(
jobVersionInputs.stream()
.map(i -> (DatasetVersionRow) i)
.collect(Collectors.toList())),
toDatasetIds(
jobVersionOutputs.stream()
.map(o -> (DatasetVersionRow) o)
.collect(Collectors.toList())),
jobRow.getLocation());

return new JobRowRunDetails(
jobRow, runUuid, namespaceRow, jobVersionInputs, jobVersionOutputs, jobVersion);
}

/** A container class for job version info. */
@Value
class BagOfJobVersionInfo {
Expand All @@ -567,6 +584,14 @@ class BagOfJobVersionInfo {

record JobDataset(String namespace, String name, IoType ioType) {}

record JobRowRunDetails(
JobRow jobRow,
UUID runUuid,
NamespaceRow namespaceRow,
List<ExtendedDatasetVersionRow> jobVersionInputs,
List<ExtendedDatasetVersionRow> jobVersionOutputs,
Version jobVersion) {}

class JobDatasetMapper implements RowMapper<JobDataset> {
@Override
public JobDataset map(ResultSet rs, StatementContext ctx) throws SQLException {
Expand Down
108 changes: 54 additions & 54 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.JobVersionDao.JobRowRunDetails;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -64,7 +65,6 @@
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
import marquez.service.models.LineageEvent.NominalTimeRunFacet;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.LineageEvent.ProcessingTypeJobFacet;
import marquez.service.models.LineageEvent.Run;
import marquez.service.models.LineageEvent.RunFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
Expand Down Expand Up @@ -168,54 +168,12 @@ SELECT count(*)
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());
if (event.getEventType() != null && runState.isDone()) {

if (Utils.isStreamingJob(event.getJob())) {
updateMarquezOnStreamingJob(event, updateLineageRow, runState);
} else if (event.getEventType() != null && runState.isDone()) {
updateMarquezOnComplete(event, updateLineageRow, runState);
}
// TODO: add a columns: jobType, integration, processingType jobs table to determine job type ->
// https://github.com/OpenLineage/OpenLineage/pull/2241
// TODO: verify those columns in jobs' api (should be part of JobData)

// if a job is streaming job - datasets versions should be created on start
// TODO: make it enum, use some method for this
Optional<String> processingType =
Optional.ofNullable(event.getJob())
.map(Job::getFacets)
.map(JobFacet::getProcessingTypeJobFacet)
.map(ProcessingTypeJobFacet::getProcessingType);

// TODO: migrate this to separate method
// TODO: test streaming facet in integration test (read from facet in json file)
if (processingType.orElse("").equals("STREAMING")) {
// insert IO rows
ModelDaos daos = new ModelDaos();
daos.initBaseDao(this);
updateLineageRow
.getInputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
updateLineageRow
.getOutputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
}

return updateLineageRow;
}
Expand Down Expand Up @@ -792,16 +750,58 @@ default Set<DatasetId> toDatasetId(List<Dataset> datasets) {

default void updateMarquezOnComplete(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
final JobVersionDao jobVersionDao = createJobVersionDao();
// Link the job version to the job only if the run is marked done and has transitioned into one
// of the following states: COMPLETED, ABORTED, or FAILED.
final boolean linkJobToJobVersion = runState.isDone();

BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
jobVersionDao.upsertJobVersionOnRunTransition(
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()),
runState,
event.getEventTime().toInstant(),
linkJobToJobVersion);
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}

/**
* A separate method is used as the logic to update Marquez model differs for streaming and batch.
* The assumption for batch is that the job version is created when task is done and cumulative
* list of input and output datasets from all the events is used to compute the job version UUID.
* However, this wouldn't make sense for streaming jobs, which are mostly long living and produce
* output before completing.
*
* <p>In this case, a job version is created based on the list of input and output datasets
* referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is
* created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then
* the union of all datasets registered within this job does not change, and thus job version does
* not get modified. In case of receiving another event with no inputs nor outputs, job version
* still will not get modified as its hash is evaluated based on the datasets attached to the run.
*
* <p>However, in case of event with inputs:{A,B,D} and outputs:{C}, new hash gets computed and
* new job version row is inserted into the table.
*
* @param event
* @param updateLineageRow
* @param runState
*/
default void updateMarquezOnStreamingJob(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
final JobVersionDao jobVersionDao = createJobVersionDao();
JobRowRunDetails jobRowRunDetails =
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());

if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
// need to insert new job version
BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
jobRowRunDetails, runState, event.getEventTime().toInstant(), true);
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}
}

default String getUrlOrNull(String uri) {
try {
return new URI(uri).toASCIIString();
Expand All @@ -820,7 +820,7 @@ default String formatNamespaceName(String namespace) {
}

default JobType getJobType(Job job) {
return JobType.BATCH;
return Utils.isStreamingJob(job) ? JobType.STREAM : JobType.BATCH;
}

default DatasetRecord upsertLineageDataset(
Expand Down
Loading

0 comments on commit 9d2eea8

Please sign in to comment.