Skip to content

Commit

Permalink
OL facets - PR1 - create & feed new tables while not reading them
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Jan 9, 2023
1 parent 88e00a9 commit 9cfdd83
Show file tree
Hide file tree
Showing 30 changed files with 1,804 additions and 46 deletions.
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
ColumnLineageDao createColumnLineageDao();

@CreateSqlObject
DatasetFacetsDao createDatasetFacetsDao();

@CreateSqlObject
JobFacetsDao createJobFacetsDao();

@CreateSqlObject
RunFacetsDao createRunFacetsDao();
}
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;

@Slf4j
public final class Columns {
Expand Down Expand Up @@ -299,4 +300,17 @@ public static ImmutableMap<String, String> mapOrNull(final ResultSet results, fi
final String mapAsString = results.getString(column);
return Utils.fromJson(mapAsString, new TypeReference<>() {});
}

public static PGobject toPgObject(@NonNull final Object object) {
final PGobject jsonObject = new PGobject();
jsonObject.setType("jsonb");
final String json = Utils.toJson(object);
try {
jsonObject.setValue(json);
} catch (SQLException e) {
log.error("Error when ...", e);
return null;
}
return jsonObject;
}
}
284 changes: 284 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import static marquez.db.Columns.toPgObject;

import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;

/** The DAO for {@code dataset} facets. */
public interface DatasetFacetsDao {
/* An {@code enum} used ... */
enum Type {
DATASET,
INPUT,
OUTPUT,
UNKNOWN;
}

/* An {@code enum} used to determine the dataset facet. */
enum Facet {
DOCUMENTATION(Type.DATASET, "documentation"),
SCHEMA(Type.DATASET, "schema"),
DATASOURCE(Type.DATASET, "dataSource"),
DESCRIPTION(Type.DATASET, "description"),
LIFECYCLE_STATE_CHANGE(Type.DATASET, "lifecycleStateChange"),
VERSION(Type.DATASET, "version"),
COLUMN_LINEAGE(Type.DATASET, "columnLineage"),
OWNERSHIP(Type.DATASET, "ownership"),
DATA_QUALITY_METRICS(Type.INPUT, "dataQualityMetrics"),
DATA_QUALITY_ASSERTIONS(Type.INPUT, "dataQualityAssertions"),
OUTPUT_STATISTICS(Type.OUTPUT, "outputStatistics");

// whenever adding new types, please mind migration script in
// marquez.db.migrations.V55_5__BackfillFacets

final Type type;
final String name;

Facet(@NonNull final Type type, @NonNull final String name) {
this.type = type;
this.name = name;
}

Type getType() {
return type;
}

String getName() {
return name;
}

/** ... */
public static Facet fromName(@NonNull final String name) {
for (final Facet facet : Facet.values()) {
if (facet.getName().equalsIgnoreCase(name)) {
return facet;
}
}
return null;
}

/** ... */
public ObjectNode asJson(@NonNull Object facetValue) {
return asJson(name, facetValue);
}

/** ... */
public static ObjectNode asJson(@NonNull final String facetName, @NonNull Object facetValue) {
final ObjectNode facetAsJson = Utils.getMapper().createObjectNode();
facetAsJson.putPOJO(facetName, facetValue);
return facetAsJson;
}
}

/**
* @param uuid
* @param createdAt
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param type
* @param name
* @param facet
*/
@SqlUpdate(
"""
INSERT INTO dataset_facets (
uuid,
created_at,
dataset_uuid,
run_uuid,
lineage_event_time,
lineage_event_type,
type,
name,
facet
) VALUES (
:uuid,
:createdAt,
:datasetUuid,
:runUuid,
:lineageEventTime,
:lineageEventType,
:type,
:name,
:facet
)
""")
void insertDatasetFacet(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
Type type,
String name,
PGobject facet);

/**
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param datasetFacets
*/
@Transaction
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

// Add ...
Optional.ofNullable(datasetFacets.getDocumentation())
.ifPresent(
documentation ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.DOCUMENTATION.getType(),
Facet.DOCUMENTATION.getName(),
toPgObject(Facet.DOCUMENTATION.asJson(documentation))));

// Add ...
Optional.ofNullable(datasetFacets.getSchema())
.ifPresent(
schema ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.SCHEMA.getType(),
Facet.SCHEMA.getName(),
toPgObject(Facet.SCHEMA.asJson(schema))));

// Add ...
Optional.ofNullable(datasetFacets.getDataSource())
.ifPresent(
datasource ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.DATASOURCE.getType(),
Facet.DATASOURCE.getName(),
toPgObject(Facet.DATASOURCE.asJson(datasource))));

// Add ...
Optional.ofNullable(datasetFacets.getDescription())
.ifPresent(
description ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.DESCRIPTION.getType(),
Facet.DESCRIPTION.getName(),
toPgObject(Facet.DESCRIPTION.asJson(description))));

// Add ...
Optional.ofNullable(datasetFacets.getLifecycleStateChange())
.ifPresent(
lifecycleStateChange ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.LIFECYCLE_STATE_CHANGE.getType(),
Facet.LIFECYCLE_STATE_CHANGE.getName(),
toPgObject(Facet.LIFECYCLE_STATE_CHANGE.asJson(lifecycleStateChange))));

// Add ...
Optional.ofNullable(datasetFacets.getColumnLineage())
.ifPresent(
lifecycleStateChange ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Facet.COLUMN_LINEAGE.getType(),
Facet.COLUMN_LINEAGE.getName(),
toPgObject(Facet.COLUMN_LINEAGE.asJson(lifecycleStateChange))));

// Add ..
Optional.ofNullable(datasetFacets.getAdditionalFacets())
.ifPresent(
additional ->
additional.forEach(
(name, facet) -> {
Optional.ofNullable(Facet.fromName(name))
.ifPresentOrElse(
(x) -> {
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
x.getType(),
x.getName(),
toPgObject(x.asJson(facet)));
},
() -> {
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.UNKNOWN,
name,
toPgObject(Facet.asJson(name, facet)));
});
}));
}

record DatasetFacetRow(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
DatasetFacetsDao.Type type,
String name,
PGobject facet) {}
}
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import marquez.db.mappers.JobDataMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.models.DatasetData;
import marquez.db.models.JobData;
import marquez.service.models.DatasetData;
import marquez.service.models.JobData;
import marquez.service.models.Run;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindList;
Expand Down
Loading

0 comments on commit 9cfdd83

Please sign in to comment.