Skip to content

Commit

Permalink
Runless events - consume dataset event
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 6, 2023
1 parent 75da43c commit 2dfc2c0
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.41.0...HEAD)

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Save into Marquez model datasets sent via `DatasetEvent` event type

## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
### Added
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)
Expand Down
27 changes: 17 additions & 10 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand Down Expand Up @@ -67,23 +69,28 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
} else {
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof DatasetEvent) {
openLineageService
.createAsync((DatasetEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof JobEvent) {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

// return serialized event
asyncResponse.resume(Response.status(200).entity(event).build());
}
}

private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
}

private int determineStatusCode(Throwable e) {
if (e instanceof CompletionException) {
return determineStatusCode(e.getCause());
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ void insertDatasetFacet(
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

Expand Down
86 changes: 85 additions & 1 deletion api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import marquez.db.models.SourceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
Expand Down Expand Up @@ -93,6 +95,14 @@ void createLineageEvent(
PGobject event,
String producer);

@SqlUpdate(
"INSERT INTO lineage_events ("
+ "event_time, "
+ "event, "
+ "producer) "
+ "VALUES (?, ?, ?)")
void createLineageEvent(Instant eventTime, PGobject event, String producer);

@SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid")
List<LineageEvent> findLineageEventsByRunUuid(UUID runUuid);

Expand Down Expand Up @@ -135,6 +145,80 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
return updateLineageRow;
}

default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
RunDao runDao = createRunDao();
DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao();
ColumnLineageDao columnLineageDao = createColumnLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

UpdateLineageRow bag = new UpdateLineageRow();
NamespaceRow namespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(),
now,
formatNamespaceName(event.getDataset().getNamespace()),
DEFAULT_NAMESPACE_OWNER);
bag.setNamespace(namespace);

Dataset dataset = event.getDataset();
List<DatasetRecord> datasetOutputs = new ArrayList<>();
DatasetRecord record =
upsertLineageDataset(
dataset,
now,
null,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao,
columnLineageDao);
datasetOutputs.add(record);

// Facets ...
Optional.ofNullable(dataset.getFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
null,
now,
null,
facets));

// OutputFacets ...
Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
null,
now,
null,
facets));

datasetDao.updateVersion(
record.getDatasetVersionRow().getDatasetUuid(),
Instant.now(),
record.getDatasetVersionRow().getUuid());

bag.setOutputs(Optional.ofNullable(datasetOutputs));

return bag;
}

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
Expand Down Expand Up @@ -938,7 +1022,7 @@ default UUID runToUuid(String runId) {
}
}

default PGobject createJsonArray(LineageEvent event, ObjectMapper mapper) {
default PGobject createJsonArray(BaseEvent event, ObjectMapper mapper) {
try {
PGobject jsonObject = new PGobject();
jsonObject.setType("json");
Expand Down
25 changes: 25 additions & 0 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import marquez.service.RunTransitionListener.RunInput;
import marquez.service.RunTransitionListener.RunOutput;
import marquez.service.RunTransitionListener.RunTransition;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.RunMeta;

Expand All @@ -67,6 +68,30 @@ public OpenLineageService(BaseDao baseDao, RunService runService, Executor execu
this.executor = executor;
}

public CompletableFuture<Void> createAsync(DatasetEvent event) {
CompletableFuture<Void> openLineage =
CompletableFuture.runAsync(
withSentry(
withMdc(
() ->
createLineageEvent(
event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(),
createJsonArray(event, mapper),
event.getProducer()))),
executor);

CompletableFuture<Void> marquez =
CompletableFuture.runAsync(
withSentry(
withMdc(
() -> {
updateMarquezModel(event, mapper);
})),
executor);

return CompletableFuture.allOf(marquez, openLineage);
}

public CompletableFuture<Void> createAsync(LineageEvent event) {
UUID runUuid = runUuidFromEvent(event.getRun());
CompletableFuture<Void> openLineage =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE dataset_facets ALTER COLUMN lineage_event_type DROP NOT NULL;
34 changes: 24 additions & 10 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
Expand Down Expand Up @@ -1376,7 +1375,7 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio
}

@Test
public void testSendDatasetEventIsDecoded() throws IOException {
public void testSendDatasetEvent() throws IOException {
final String openLineageEventAsString =
Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset());

Expand All @@ -1394,16 +1393,31 @@ public void testSendDatasetEventIsDecoded() throws IOException {
// Ensure the event was received.
Map<Integer, String> respMap = resp.join();

assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201
assertThat(respMap.containsKey(201)).isTrue();

// (3) Convert the OpenLineage event to Json.
DatasetEvent datasetEvent =
marquez.client.Utils.fromJson(respMap.get(200), new TypeReference<DatasetEvent>() {});
assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name");
assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1);
assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName())
.isEqualTo("col_a");
assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001");
final JsonNode openLineageEventAsJson =
Utils.fromJson(openLineageEventAsString, new TypeReference<JsonNode>() {});

// (4) Verify dataset facet associated with the OpenLineage event.
final JsonNode json = openLineageEventAsJson.path("dataset");

final String namespace = json.path("namespace").asText();
final String output = json.path("name").asText();
final JsonNode expectedFacets = json.path("facets");

final Dataset dataset = client.getDataset(namespace, output);
assertThat(Utils.getMapper().convertValue(dataset.getFacets(), JsonNode.class))
.isEqualTo(expectedFacets);

List<DatasetVersion> datasetVersions = client.listDatasetVersions(namespace, output);
assertThat(datasetVersions).isNotEmpty();

DatasetVersion latestDatasetVersion = datasetVersions.get(0);
assertThat(latestDatasetVersion.getNamespace()).isEqualTo(namespace);
assertThat(latestDatasetVersion.getName()).isEqualTo(output);
assertThat(Utils.getMapper().convertValue(latestDatasetVersion.getFacets(), JsonNode.class))
.isEqualTo(expectedFacets);
}

@Test
Expand Down
39 changes: 39 additions & 0 deletions api/src/test/java/marquez/db/LineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import marquez.common.Utils;
import marquez.db.models.UpdateLineageRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
Expand Down Expand Up @@ -173,6 +174,44 @@ public static UpdateLineageRow createLineageRow(
return updateLineageRow;
}

/**
* Create an {@link UpdateLineageRow} from the input job details and datasets.
*
* @param dao
* @param dataset
* @return
*/
public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset dataset) {

DatasetEvent event =
DatasetEvent.builder()
.eventTime(Instant.now().atZone(LOCAL_ZONE))
.dataset(dataset)
.producer(PRODUCER_URL.toString())
.build();

// emulate an OpenLineage DatasetEvent
event
.getProperties()
.put(
"_schemaURL",
"https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent");
UpdateLineageRow updateLineageRow = dao.updateMarquezModel(event, Utils.getMapper());
PGobject jsonObject = new PGobject();
jsonObject.setType("json");
try {
jsonObject.setValue(Utils.toJson(event));
} catch (SQLException e) {
throw new RuntimeException(e);
}
dao.createLineageEvent(
event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(),
jsonObject,
event.getProducer());

return updateLineageRow;
}

public static DatasetFacets newDatasetFacet(SchemaField... fields) {
return newDatasetFacet(EMPTY_MAP, fields);
}
Expand Down
16 changes: 16 additions & 0 deletions api/src/test/java/marquez/db/OpenLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ void testUpdateMarquezModel() {
.isEqualTo(writeJob.getOutputs().get().get(0).getDatasetVersionRow());
}

@Test
void testUpdateMarquezModelWithDatasetEvent() {
UpdateLineageRow datasetEventRow =
LineageTestUtils.createLineageRow(
dao, new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets));

assertThat(datasetEventRow.getOutputs()).isPresent();
assertThat(datasetEventRow.getOutputs().get()).hasSize(1).first();
assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetRow())
.hasFieldOrPropertyWithValue("name", DATASET_NAME)
.hasFieldOrPropertyWithValue("namespaceName", LineageTestUtils.NAMESPACE);

assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetVersionRow())
.hasNoNullFieldsOrPropertiesExcept("runUuid");
}

@Test
void testUpdateMarquezModelLifecycleStateChangeFacet() {
Dataset dataset =
Expand Down
Loading

0 comments on commit 2dfc2c0

Please sign in to comment.