Skip to content

Commit

Permalink
Support streaming jobs in Marquez
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 15, 2023
1 parent 554f90c commit 8e55ff4
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 73 deletions.
41 changes: 41 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import marquez.service.models.LineageEvent.LifecycleStateChangeFacet;
import marquez.service.models.LineageEvent.NominalTimeRunFacet;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.LineageEvent.ProcessingTypeJobFacet;
import marquez.service.models.LineageEvent.Run;
import marquez.service.models.LineageEvent.RunFacet;
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
Expand Down Expand Up @@ -170,6 +171,46 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
if (event.getEventType() != null && runState.isDone()) {
updateMarquezOnComplete(event, updateLineageRow, runState);
}
// if a job is streaming job - datasets versions should be created on start
// TODO: make it enum, use some method for this
Optional<String> processingType =
Optional.ofNullable(event.getJob())
.map(Job::getFacets)
.map(JobFacet::getProcessingTypeJobFacet)
.map(ProcessingTypeJobFacet::getProcessingType);

if (processingType.orElse("").equals("STREAMING")) {
// insert IO rows
ModelDaos daos = new ModelDaos();
daos.initBaseDao(this);
updateLineageRow
.getInputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
updateLineageRow
.getOutputs()
.ifPresent(
l ->
l.stream()
.forEach(
d ->
daos.getJobVersionDao()
.upsertInputDatasetFor(
null, // no job version Id
d.getDatasetRow().getUuid(),
updateLineageRow.getJob().getUuid(),
updateLineageRow.getJob().getSymlinkTargetId())));
}

return updateLineageRow;
}

Expand Down
30 changes: 29 additions & 1 deletion api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,19 @@ public static class Job extends BaseJsonModel {
@Setter
@Valid
@ToString
@JsonPropertyOrder({"documentation", "sourceCodeLocation", "sql", "description"})
@JsonPropertyOrder({
"documentation",
"sourceCodeLocation",
"sql",
"description",
"processingType"
})
public static class JobFacet {

@Valid private DocumentationJobFacet documentation;
@Valid private SourceCodeLocationJobFacet sourceCodeLocation;
@Valid private SQLJobFacet sql;
@Valid private ProcessingTypeJobFacet processingType;
@Builder.Default @JsonIgnore private Map<String, Object> additional = new LinkedHashMap<>();

@JsonAnySetter
Expand All @@ -240,6 +247,10 @@ public SourceCodeLocationJobFacet getSourceCodeLocation() {
return sourceCodeLocation;
}

public ProcessingTypeJobFacet getProcessingTypeJobFacet() {
return processingType;
}

public SQLJobFacet getSql() {
return sql;
}
Expand Down Expand Up @@ -297,6 +308,23 @@ public SQLJobFacet(@NotNull URI _producer, @NotNull URI _schemaURL, @NotNull Str
}
}

@NoArgsConstructor
@Getter
@Setter
@Valid
@ToString
public static class ProcessingTypeJobFacet extends BaseFacet {

@NotNull private String processingType;

@Builder
public ProcessingTypeJobFacet(
@NotNull URI _producer, @NotNull URI _schemaURL, @NotNull String processingType) {
super(_producer, _schemaURL);
this.processingType = processingType;
}
}

@Builder
@AllArgsConstructor
@NoArgsConstructor
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/ColumnLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.OpenLineageDao;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.JobFacet;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -37,8 +38,7 @@ public class ColumnLineageIntegrationTest extends BaseIntegrationTest {
public void setup(Jdbi jdbi) {
OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class);

LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();

LineageEvent.Dataset dataset_A = getDatasetA();
LineageEvent.Dataset dataset_B = getDatasetB();
Expand Down
4 changes: 1 addition & 3 deletions api/src/test/java/marquez/db/BackfillTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ INSERT INTO job_versions (uuid, created_at, updated_at, job_uuid, version, locat
nominalTimeRunFacet,
parentRun.orElse(null),
ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc")))))
.job(
new LineageEvent.Job(
NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)))
.job(new LineageEvent.Job(NAMESPACE, jobName, JobFacet.builder().build()))
.inputs(
Collections.singletonList(
new LineageEvent.Dataset(
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/marquez/db/ColumnLineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import marquez.api.JdbiUtils;
import marquez.db.models.UpdateLineageRow;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.JobFacet;
import org.jdbi.v3.core.Jdbi;

public class ColumnLineageTestUtils {
Expand Down Expand Up @@ -110,8 +111,7 @@ public static LineageEvent.Dataset getDatasetC() {

public static UpdateLineageRow createLineage(
OpenLineageDao openLineageDao, LineageEvent.Dataset input, LineageEvent.Dataset output) {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();
return LineageTestUtils.createLineageRow(
openLineageDao,
"job_" + UUID.randomUUID(),
Expand Down
2 changes: 1 addition & 1 deletion api/src/test/java/marquez/db/DatasetDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DatasetDaoTest {
private static DatasetDao datasetDao;
private static OpenLineageDao openLineageDao;

private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
private final JobFacet jobFacet = JobFacet.builder().build();

static Jdbi jdbi;

Expand Down
13 changes: 5 additions & 8 deletions api/src/test/java/marquez/db/DatasetFacetsDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.JobFacet;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -308,8 +309,7 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() {

@Test
public void testInsertOutputDatasetFacetsFor() {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow lineageRow =
LineageTestUtils.createLineageRow(
Expand Down Expand Up @@ -340,8 +340,7 @@ public void testInsertOutputDatasetFacetsFor() {

@Test
public void testInsertInputDatasetFacetsFor() {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow lineageRow =
LineageTestUtils.createLineageRow(
Expand Down Expand Up @@ -372,8 +371,7 @@ public void testInsertInputDatasetFacetsFor() {

private UpdateLineageRow createLineageRowWithInputDataset(
LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();

return LineageTestUtils.createLineageRow(
openLineageDao,
Expand All @@ -389,8 +387,7 @@ private UpdateLineageRow createLineageRowWithInputDataset(

private UpdateLineageRow createLineageRowWithOutputDataset(
LineageEvent.DatasetFacets.DatasetFacetsBuilder outputDatasetFacetsbuilder) {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
LineageEvent.JobFacet jobFacet = JobFacet.builder().build();

return LineageTestUtils.createLineageRow(
openLineageDao,
Expand Down
23 changes: 15 additions & 8 deletions api/src/test/java/marquez/db/FacetTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.UUID;
import marquez.db.models.UpdateLineageRow;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.JobFacet;
import org.apache.commons.lang3.StringUtils;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

Expand All @@ -20,14 +21,20 @@ public class FacetTestUtils {

public static UpdateLineageRow createLineageWithFacets(OpenLineageDao openLineageDao) {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(
new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"),
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"),
new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"),
Map.of(
"ownership", "some-owner",
"sourceCode", "some-code"));
JobFacet.builder()
.documentation(
new LineageEvent.DocumentationJobFacet(
PRODUCER_URL, SCHEMA_URL, "some-documentation"))
.sourceCodeLocation(
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"))
.sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"))
.additional(
Map.of(
"ownership", "some-owner",
"sourceCode", "some-code"))
.build();

return LineageTestUtils.createLineageRow(
openLineageDao,
"job_" + UUID.randomUUID(),
Expand Down
31 changes: 19 additions & 12 deletions api/src/test/java/marquez/db/JobFacetsDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.JobFacet;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -63,12 +64,14 @@ public void insertJobFacets() {
openLineageDao,
"job_" + UUID.randomUUID(),
"COMPLETE",
new LineageEvent.JobFacet(
null,
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"),
null,
LineageTestUtils.EMPTY_MAP),
JobFacet.builder()
.sourceCodeLocation(
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL,
SCHEMA_URL,
"git",
"git@github.com:OpenLineage/OpenLineage.git"))
.build(),
Collections.emptyList(),
Collections.emptyList());

Expand All @@ -88,12 +91,16 @@ public void insertJobFacets() {
@Test
public void testGetFacetsByRunUuid() {
LineageEvent.JobFacet jobFacet =
new LineageEvent.JobFacet(
new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"),
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"),
new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"),
null);
JobFacet.builder()
.documentation(
new LineageEvent.DocumentationJobFacet(
PRODUCER_URL, SCHEMA_URL, "some-documentation"))
.sourceCodeLocation(
new LineageEvent.SourceCodeLocationJobFacet(
PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"))
.sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"))
.build();

UpdateLineageRow lineageRow =
LineageTestUtils.createLineageRow(
openLineageDao,
Expand Down
12 changes: 6 additions & 6 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class LineageDaoTest {
new SchemaField("firstname", "string", "the first name"),
new SchemaField("lastname", "string", "the last name"),
new SchemaField("birthdate", "date", "the date of birth")));
private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
private final JobFacet jobFacet = JobFacet.builder().build();

static Jdbi jdbi;

Expand Down Expand Up @@ -392,7 +392,7 @@ public void testGetLineageWithJobThatSharesNoDatasets() {
/** A failed consumer job doesn't show up in the datasets out edges */
@Test
public void testGetLineageWithFailedConsumer() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
Expand Down Expand Up @@ -512,7 +512,7 @@ public void testGetInputDatasetsWithJobThatHasMultipleVersions() {
/** A failed producer job doesn't show up in the lineage */
@Test
public void testGetLineageWithFailedProducer() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
Expand Down Expand Up @@ -541,7 +541,7 @@ public void testGetLineageWithFailedProducer() {
/** A failed producer job doesn't show up in the lineage */
@Test
public void testGetLineageChangedJobVersion() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
Expand All @@ -567,7 +567,7 @@ public void testGetLineageChangedJobVersion() {

@Test
public void testGetJobFromInputOrOutput() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
JobFacet jobFacet = JobFacet.builder().build();

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
Expand All @@ -591,7 +591,7 @@ public void testGetJobFromInputOrOutput() {

@Test
public void testGetJobFromInputOrOutputPrefersRecentOutputJob() {
JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP);
JobFacet jobFacet = JobFacet.builder().build();

// add some consumer jobs prior to the write so we know that the sort isn't simply picking
// the first job created
Expand Down
Loading

0 comments on commit 8e55ff4

Please sign in to comment.