Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streaming jobs in Marquez #2682

Merged
merged 3 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*Save into Marquez model datasets sent via `DatasetEvent` event type
* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Save into Marquez model jobs and datasets sent via `JobEvent` event type.
* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes.

## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17
### Added
Expand Down
129 changes: 77 additions & 52 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
return findInputOrOutputDatasetsFor(jobVersionUuid, IoType.OUTPUT);
}

/**
* Verifies if a job with a specified job version is present in table.
*
* @param version Version identifier
*/
@SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)")
boolean versionExists(UUID version);

/**
* Returns the input or output datasets for a given job version.
*
Expand Down Expand Up @@ -447,98 +455,73 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
* and context. A version for a given job is created <i>only</i> when a {@link Run} transitions
* into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param jobRowRunDetails The job row with run details.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull JobRowRunDetails jobRowRunDetails,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
@NonNull Instant transitionedAt,
boolean linkJobToJobVersion) {
// Get the job.
final JobDao jobDao = createJobDao();

// Get the inputs and outputs dataset versions for the run associated with the job version.
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
final List<ExtendedDatasetVersionRow> jobVersionInputs =
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);

// Get the namespace for the job.
final NamespaceRow namespaceRow =
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();

// Generate the version for the job; the version may already exist.
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(
jobVersionInputs.stream()
.map(i -> (DatasetVersionRow) i)
.collect(Collectors.toList())),
toDatasetIds(
jobVersionOutputs.stream()
.map(o -> (DatasetVersionRow) o)
.collect(Collectors.toList())),
jobRow.getLocation());

// Add the job version.
final JobVersionDao jobVersionDao = createJobVersionDao();

final JobVersionRow jobVersionRow =
jobVersionDao.upsertJobVersion(
UUID.randomUUID(),
transitionedAt, // Use the timestamp of when the run state transitioned.
jobRow.getUuid(),
jobRow.getLocation(),
jobVersion.getValue(),
jobRow.getName(),
namespaceRow.getUuid(),
jobRow.getNamespaceName());
jobRowRunDetails.jobRow.getUuid(),
jobRowRunDetails.jobRow.getLocation(),
jobRowRunDetails.jobVersion.getValue(),
jobRowRunDetails.jobRow.getName(),
jobRowRunDetails.namespaceRow.getUuid(),
jobRowRunDetails.jobRow.getNamespaceName());

// Link the input datasets to the job version.
jobVersionInputs.forEach(
jobRowRunDetails.jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
jobRowRunDetails.jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobRowRunDetails.jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
jobRowRunDetails.jobRow.getSymlinkTargetId());
});

// Link the job version to the run.
createRunDao().updateJobVersion(runUuid, jobVersionRow.getUuid());
createRunDao().updateJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());

// Link the run to the job version; multiple run instances may be linked to a job version.
jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid);
jobVersionDao.updateLatestRunFor(
jobVersionRow.getUuid(), transitionedAt, jobRowRunDetails.runUuid);

// Link the job facets to this job version
jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid());
jobVersionDao.linkJobFacetsToJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());

// Link the job version to the job only if the run is marked done and has transitioned into one
// of the following states: COMPLETED, ABORTED, or FAILED.
if (runState.isDone()) {
jobDao.updateVersionFor(jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
if (linkJobToJobVersion) {
jobDao.updateVersionFor(
jobRowRunDetails.jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
}

return new BagOfJobVersionInfo(jobRow, jobVersionRow, jobVersionInputs, jobVersionOutputs);
return new BagOfJobVersionInfo(
jobRowRunDetails.jobRow,
jobVersionRow,
jobRowRunDetails.jobVersionInputs,
jobRowRunDetails.jobVersionOutputs);
}

/** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */
Expand All @@ -556,6 +539,40 @@ private DatasetId toDatasetId(DatasetVersionRow dataset) {
NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName()));
}

default JobRowRunDetails loadJobRowRunDetails(JobRow jobRow, UUID runUuid) {
// Get the inputs and outputs dataset versions for the run associated with the job version.
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
final List<ExtendedDatasetVersionRow> jobVersionInputs =
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);

// Get the namespace for the job.
final NamespaceRow namespaceRow =
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();

// Generate the version for the job; the version may already exist.
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(
jobVersionInputs.stream()
.map(i -> (DatasetVersionRow) i)
.collect(Collectors.toList())),
toDatasetIds(
jobVersionOutputs.stream()
.map(o -> (DatasetVersionRow) o)
.collect(Collectors.toList())),
jobRow.getLocation());

return new JobRowRunDetails(
jobRow, runUuid, namespaceRow, jobVersionInputs, jobVersionOutputs, jobVersion);
}

/** A container class for job version info. */
@Value
class BagOfJobVersionInfo {
Expand All @@ -567,6 +584,14 @@ class BagOfJobVersionInfo {

record JobDataset(String namespace, String name, IoType ioType) {}

record JobRowRunDetails(
JobRow jobRow,
UUID runUuid,
NamespaceRow namespaceRow,
List<ExtendedDatasetVersionRow> jobVersionInputs,
List<ExtendedDatasetVersionRow> jobVersionOutputs,
Version jobVersion) {}

class JobDatasetMapper implements RowMapper<JobDataset> {
@Override
public JobDataset map(ResultSet rs, StatementContext ctx) throws SQLException {
Expand Down
72 changes: 57 additions & 15 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.DatasetType;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunState;
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.JobVersionDao.JobRowRunDetails;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -167,9 +167,13 @@ SELECT count(*)
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
RunState runState = getRunState(event.getEventType());
if (event.getEventType() != null && runState.isDone()) {

if (event.getJob() != null && event.getJob().isStreamingJob()) {
updateMarquezOnStreamingJob(event, updateLineageRow, runState);
} else if (event.getEventType() != null && runState.isDone()) {
updateMarquezOnComplete(event, updateLineageRow, runState);
}

return updateLineageRow;
}

Expand Down Expand Up @@ -559,7 +563,7 @@ private JobRow buildJobFromEvent(
jobDao.upsertJob(
UUID.randomUUID(),
parent.getUuid(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand All @@ -572,7 +576,7 @@ private JobRow buildJobFromEvent(
() ->
jobDao.upsertJob(
UUID.randomUUID(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand Down Expand Up @@ -680,7 +684,7 @@ private JobRow createParentJobRunRecord(
createJobDao()
.upsertJob(
UUID.randomUUID(),
getJobType(job),
job.type(),
now,
namespace.getUuid(),
namespace.getName(),
Expand Down Expand Up @@ -745,16 +749,58 @@ default Set<DatasetId> toDatasetId(List<Dataset> datasets) {

default void updateMarquezOnComplete(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
final JobVersionDao jobVersionDao = createJobVersionDao();
// Link the job version to the job only if the run is marked done and has transitioned into one
// of the following states: COMPLETED, ABORTED, or FAILED.
final boolean linkJobToJobVersion = runState.isDone();

BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
jobVersionDao.upsertJobVersionOnRunTransition(
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()),
runState,
event.getEventTime().toInstant(),
linkJobToJobVersion);
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}

/**
* A separate method is used as the logic to update Marquez model differs for streaming and batch.
* The assumption for batch is that the job version is created when task is done and cumulative
* list of input and output datasets from all the events is used to compute the job version UUID.
* However, this wouldn't make sense for streaming jobs, which are mostly long living and produce
* output before completing.
*
* <p>In this case, a job version is created based on the list of input and output datasets
* referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is
* created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then
Copy link
Member

@wslulciuc wslulciuc Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a job has {A,B} as initial inputs, the only {A} shouldn't that be a new job version? Given that all inputs/outputs are expected when a job run has been started, we should create a job version anytime the inputs or outputs change and associated the run with the new job version.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing algorithm to compute version of the job relies on all the inputs & datasets for the particular run. We should not modify it, as this would cause new job version for all the existing jobs, but we could create a separate version to evaluate version of the streaming job if we wanted.

However, when looking into the approach, I found it useful. It's a cumulative approach, where new job version is created if a new input/output dataset is involved in processing. If some dataset was included in the past events, but is no longer present, the version does not get change.

The benefit of this assumption is that we don't require producer to emit all the datasets all the time. If you emit amount of bytes written into output dataset, without containing input dataset in the event, it doesn't mean there is new job version without the inputs.

Is it OK?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do feel if a stream is removed, you'll want to remove that edge from the current lineage graph. But, I also understand the limitations here, as you mentioned, with bytes written to an output dataset present with no input datasets specified; that said, and after thinking it through, this may be a noop and the logic you have is reasonable.

For example, let's say we have a streaming job X with inputs {A,B} and outputs C. The job runs and is marked as RUNNING with the run ID 74f117af-eb90-4ffd-98e1-c1bc215934df. To change the inputs from {A,B} to {B} (or similarly for the outputs), the user will have to redeploy the job (with new code possibly) and therefore be associated with a new run ID. So, what I think you have is logically correct given how streaming jobs are deployed. For batch jobs, versioning is more straight forward as we version from run-to-run.

* the union of all datasets registered within this job does not change, and thus job version does
* not get modified. In case of receiving another event with no inputs nor outputs, job version
* still will not get modified as its hash is evaluated based on the datasets attached to the run.
*
* <p>However, in case of event with inputs:{A,B,D} and outputs:{C}, new hash gets computed and
* new job version row is inserted into the table.
*
* @param event
* @param updateLineageRow
* @param runState
*/
default void updateMarquezOnStreamingJob(
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
final JobVersionDao jobVersionDao = createJobVersionDao();
JobRowRunDetails jobRowRunDetails =
jobVersionDao.loadJobRowRunDetails(
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());

if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
// need to insert new job version
BagOfJobVersionInfo bagOfJobVersionInfo =
jobVersionDao.upsertJobVersionOnRunTransition(
jobRowRunDetails, runState, event.getEventTime().toInstant(), true);
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
}
}

default String getUrlOrNull(String uri) {
try {
return new URI(uri).toASCIIString();
Expand All @@ -772,10 +818,6 @@ default String formatNamespaceName(String namespace) {
return namespace.replaceAll("[^a-z:/A-Z0-9\\-_.@+]", "_");
}

default JobType getJobType(Job job) {
return JobType.BATCH;
}

default DatasetRecord upsertLineageDataset(
ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) {
daos.initBaseDao(this);
Expand Down
Loading
Loading