Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runless events - consume dataset event #2641

Merged
merged 7 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ API: remove usage of `current_job_context_uuid` column [`#2622`](https://github.
Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub)
*Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.*

### Added
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Save into Marquez model datasets sent via `DatasetEvent` event type

## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
### Added
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)
Expand Down
27 changes: 17 additions & 10 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand Down Expand Up @@ -67,23 +69,28 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
} else {
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof DatasetEvent) {
openLineageService
.createAsync((DatasetEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof JobEvent) {
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

// return serialized event
asyncResponse.resume(Response.status(200).entity(event).build());
}
}

private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
}

private int determineStatusCode(Throwable e) {
if (e instanceof CompletionException) {
return determineStatusCode(e.getCause());
Expand Down
5 changes: 3 additions & 2 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.service.models.LineageEvent;
Expand Down Expand Up @@ -126,9 +127,9 @@ void insertDatasetFacet(
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID datasetVersionUuid,
@NonNull UUID runUuid,
@Nullable UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@Nullable String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

Expand Down
86 changes: 85 additions & 1 deletion api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import marquez.db.models.SourceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
Expand Down Expand Up @@ -93,6 +95,14 @@ void createLineageEvent(
PGobject event,
String producer);

@SqlUpdate(
"INSERT INTO lineage_events ("
+ "event_time, "
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
+ "event, "
+ "producer) "
+ "VALUES (?, ?, ?)")
void createDatasetEvent(Instant eventTime, PGobject event, String producer);

@SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid")
List<LineageEvent> findLineageEventsByRunUuid(UUID runUuid);

Expand Down Expand Up @@ -135,6 +145,80 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map
return updateLineageRow;
}

default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
DatasetDao datasetDao = createDatasetDao();
SourceDao sourceDao = createSourceDao();
DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
DatasetFieldDao datasetFieldDao = createDatasetFieldDao();
RunDao runDao = createRunDao();
DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao();
ColumnLineageDao columnLineageDao = createColumnLineageDao();

Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant();

UpdateLineageRow bag = new UpdateLineageRow();
NamespaceRow namespace =
namespaceDao.upsertNamespaceRow(
UUID.randomUUID(),
now,
formatNamespaceName(event.getDataset().getNamespace()),
DEFAULT_NAMESPACE_OWNER);
bag.setNamespace(namespace);

Dataset dataset = event.getDataset();
List<DatasetRecord> datasetOutputs = new ArrayList<>();
DatasetRecord record =
upsertLineageDataset(
dataset,
now,
null,
false,
namespaceDao,
datasetSymlinkDao,
sourceDao,
datasetDao,
datasetVersionDao,
datasetFieldDao,
runDao,
columnLineageDao);
datasetOutputs.add(record);

// Facets ...
Optional.ofNullable(dataset.getFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
null,
now,
null,
facets));

// OutputFacets ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not have InputFacets as well in this case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither inputFacet nor outputFacets should be allowed for DatasetEvent.
We should have 3 methods: insertDatasetFacets, insertInputDatasetFacets and insertOutputDatasetFacets.
I fixed this in the 5th commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh yes thanks for clarifying; was thinking about just this during my review 👀

Optional.ofNullable(dataset.getOutputFacets())
.ifPresent(
facets ->
datasetFacetsDao.insertOutputDatasetFacetsFor(
record.getDatasetRow().getUuid(),
record.getDatasetVersionRow().getUuid(),
null,
now,
null,
facets));

datasetDao.updateVersion(
record.getDatasetVersionRow().getDatasetUuid(),
Instant.now(),
record.getDatasetVersionRow().getUuid());

bag.setOutputs(Optional.ofNullable(datasetOutputs));

return bag;
}

default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) {
NamespaceDao namespaceDao = createNamespaceDao();
DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao();
Expand Down Expand Up @@ -938,7 +1022,7 @@ default UUID runToUuid(String runId) {
}
}

default PGobject createJsonArray(LineageEvent event, ObjectMapper mapper) {
default PGobject createJsonArray(BaseEvent event, ObjectMapper mapper) {
try {
PGobject jsonObject = new PGobject();
jsonObject.setType("json");
Expand Down
25 changes: 25 additions & 0 deletions api/src/main/java/marquez/service/OpenLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import marquez.service.RunTransitionListener.RunInput;
import marquez.service.RunTransitionListener.RunOutput;
import marquez.service.RunTransitionListener.RunTransition;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.RunMeta;

Expand All @@ -67,6 +68,30 @@ public OpenLineageService(BaseDao baseDao, RunService runService, Executor execu
this.executor = executor;
}

public CompletableFuture<Void> createAsync(DatasetEvent event) {
CompletableFuture<Void> openLineage =
CompletableFuture.runAsync(
withSentry(
withMdc(
() ->
createDatasetEvent(
event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(),
createJsonArray(event, mapper),
event.getProducer()))),
executor);

CompletableFuture<Void> marquez =
CompletableFuture.runAsync(
withSentry(
withMdc(
() -> {
updateMarquezModel(event, mapper);
})),
executor);

return CompletableFuture.allOf(marquez, openLineage);
}

public CompletableFuture<Void> createAsync(LineageEvent event) {
UUID runUuid = runUuidFromEvent(event.getRun());
CompletableFuture<Void> openLineage =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE dataset_facets ALTER COLUMN lineage_event_type DROP NOT NULL;
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
34 changes: 24 additions & 10 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
Expand Down Expand Up @@ -1376,7 +1375,7 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio
}

@Test
public void testSendDatasetEventIsDecoded() throws IOException {
public void testSendDatasetEvent() throws IOException {
final String openLineageEventAsString =
Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset());

Expand All @@ -1394,16 +1393,31 @@ public void testSendDatasetEventIsDecoded() throws IOException {
// Ensure the event was received.
Map<Integer, String> respMap = resp.join();

assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201
assertThat(respMap.containsKey(201)).isTrue();

// (3) Convert the OpenLineage event to Json.
DatasetEvent datasetEvent =
marquez.client.Utils.fromJson(respMap.get(200), new TypeReference<DatasetEvent>() {});
assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name");
assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1);
assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName())
.isEqualTo("col_a");
assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001");
final JsonNode openLineageEventAsJson =
Utils.fromJson(openLineageEventAsString, new TypeReference<JsonNode>() {});

// (4) Verify dataset facet associated with the OpenLineage event.
final JsonNode json = openLineageEventAsJson.path("dataset");

final String namespace = json.path("namespace").asText();
final String output = json.path("name").asText();
final JsonNode expectedFacets = json.path("facets");

final Dataset dataset = client.getDataset(namespace, output);
assertThat(Utils.getMapper().convertValue(dataset.getFacets(), JsonNode.class))
.isEqualTo(expectedFacets);

List<DatasetVersion> datasetVersions = client.listDatasetVersions(namespace, output);
assertThat(datasetVersions).isNotEmpty();

DatasetVersion latestDatasetVersion = datasetVersions.get(0);
assertThat(latestDatasetVersion.getNamespace()).isEqualTo(namespace);
assertThat(latestDatasetVersion.getName()).isEqualTo(output);
assertThat(Utils.getMapper().convertValue(latestDatasetVersion.getFacets(), JsonNode.class))
.isEqualTo(expectedFacets);
}

@Test
Expand Down
39 changes: 39 additions & 0 deletions api/src/test/java/marquez/db/LineageTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import marquez.common.Utils;
import marquez.db.models.UpdateLineageRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.DatasetEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.Dataset;
import marquez.service.models.LineageEvent.DatasetFacets;
Expand Down Expand Up @@ -173,6 +174,44 @@ public static UpdateLineageRow createLineageRow(
return updateLineageRow;
}

/**
* Create an {@link UpdateLineageRow} from the input job details and datasets.
*
* @param dao
* @param dataset
* @return
*/
public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset dataset) {

DatasetEvent event =
DatasetEvent.builder()
.eventTime(Instant.now().atZone(LOCAL_ZONE))
.dataset(dataset)
.producer(PRODUCER_URL.toString())
.build();

// emulate an OpenLineage DatasetEvent
event
.getProperties()
.put(
"_schemaURL",
"https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent");
UpdateLineageRow updateLineageRow = dao.updateMarquezModel(event, Utils.getMapper());
PGobject jsonObject = new PGobject();
jsonObject.setType("json");
try {
jsonObject.setValue(Utils.toJson(event));
} catch (SQLException e) {
throw new RuntimeException(e);
}
dao.createDatasetEvent(
event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(),
jsonObject,
event.getProducer());

return updateLineageRow;
}

public static DatasetFacets newDatasetFacet(SchemaField... fields) {
return newDatasetFacet(EMPTY_MAP, fields);
}
Expand Down
16 changes: 16 additions & 0 deletions api/src/test/java/marquez/db/OpenLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ void testUpdateMarquezModel() {
.isEqualTo(writeJob.getOutputs().get().get(0).getDatasetVersionRow());
}

@Test
void testUpdateMarquezModelWithDatasetEvent() {
UpdateLineageRow datasetEventRow =
LineageTestUtils.createLineageRow(
dao, new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets));

assertThat(datasetEventRow.getOutputs()).isPresent();
assertThat(datasetEventRow.getOutputs().get()).hasSize(1).first();
assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetRow())
.hasFieldOrPropertyWithValue("name", DATASET_NAME)
.hasFieldOrPropertyWithValue("namespaceName", LineageTestUtils.NAMESPACE);

assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetVersionRow())
.hasNoNullFieldsOrPropertiesExcept("runUuid");
}

@Test
void testUpdateMarquezModelLifecycleStateChangeFacet() {
Dataset dataset =
Expand Down
Loading