Skip to content

Commit

Permalink
OL facets - PR1 - create & feed new tables while not reading them
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Jan 10, 2023
1 parent 88e00a9 commit a026783
Show file tree
Hide file tree
Showing 32 changed files with 1,926 additions and 46 deletions.
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
ColumnLineageDao createColumnLineageDao();

@CreateSqlObject
DatasetFacetsDao createDatasetFacetsDao();

@CreateSqlObject
JobFacetsDao createJobFacetsDao();

@CreateSqlObject
RunFacetsDao createRunFacetsDao();
}
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;

@Slf4j
public final class Columns {
Expand Down Expand Up @@ -299,4 +300,17 @@ public static ImmutableMap<String, String> mapOrNull(final ResultSet results, fi
final String mapAsString = results.getString(column);
return Utils.fromJson(mapAsString, new TypeReference<>() {});
}

public static PGobject toPgObject(@NonNull final Object object) {
final PGobject jsonObject = new PGobject();
jsonObject.setType("jsonb");
final String json = Utils.toJson(object);
try {
jsonObject.setValue(json);
} catch (SQLException e) {
log.error("Error when ...", e);
return null;
}
return jsonObject;
}
}
215 changes: 215 additions & 0 deletions api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import static marquez.db.Columns.toPgObject;

import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.NonNull;
import marquez.service.models.LineageEvent;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.postgresql.util.PGobject;

/** The DAO for {@code dataset} facets. */
public interface DatasetFacetsDao {
/* An {@code enum} used ... */
enum Type {
DATASET,
INPUT,
OUTPUT,
UNKNOWN;
}

/* An {@code enum} used to determine the dataset facet. */
enum DatasetFacet implements Facet {
DOCUMENTATION(Type.DATASET, "documentation") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getDocumentation()).map(this::toPgObject);
}
},
SCHEMA(Type.DATASET, "schema") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getSchema()).map(this::toPgObject);
}
},
DATASOURCE(Type.DATASET, "dataSource") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getDataSource()).map(this::toPgObject);
}
},
DESCRIPTION(Type.DATASET, "description") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getDescription()).map(this::toPgObject);
}
},
LIFECYCLE_STATE_CHANGE(Type.DATASET, "lifecycleStateChange") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getLifecycleStateChange()).map(this::toPgObject);
}
},
VERSION(Type.DATASET, "version"),
COLUMN_LINEAGE(Type.DATASET, "columnLineage") {
Optional<PGobject> toPgObject(LineageEvent.DatasetFacets facets) {
return Optional.ofNullable(facets.getColumnLineage()).map(this::toPgObject);
}
},
OWNERSHIP(Type.DATASET, "ownership"),
DATA_QUALITY_METRICS(Type.INPUT, "dataQualityMetrics"),
DATA_QUALITY_ASSERTIONS(Type.INPUT, "dataQualityAssertions"),
OUTPUT_STATISTICS(Type.OUTPUT, "outputStatistics");

// whenever adding new types, please mind migration script in
// marquez.db.migrations.V55_5__BackfillFacets

final Type type;
final String name;

DatasetFacet(@NonNull final Type type, @NonNull final String name) {
this.type = type;
this.name = name;
}

Type getType() {
return type;
}

public String getName() {
return name;
}

/** ... */
public static DatasetFacet fromName(@NonNull final String name) {
for (final DatasetFacet facet : DatasetFacet.values()) {
if (facet.getName().equalsIgnoreCase(name)) {
return facet;
}
}
return null;
}

Optional<PGobject> toPgObject(LineageEvent.DatasetFacets runFacet) {
return propertiesToPgObject(runFacet.getAdditionalFacets());
}
}

/**
* @param uuid
* @param createdAt
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param type
* @param name
* @param facet
*/
@SqlUpdate(
"""
INSERT INTO dataset_facets (
uuid,
created_at,
dataset_uuid,
run_uuid,
lineage_event_time,
lineage_event_type,
type,
name,
facet
) VALUES (
:uuid,
:createdAt,
:datasetUuid,
:runUuid,
:lineageEventTime,
:lineageEventType,
:type,
:name,
:facet
)
""")
void insertDatasetFacet(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
Type type,
String name,
PGobject facet);

/**
* @param datasetUuid
* @param runUuid
* @param lineageEventTime
* @param lineageEventType
* @param datasetFacets
*/
@Transaction
default void insertDatasetFacetsFor(
@NonNull UUID datasetUuid,
@NonNull UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@NonNull LineageEvent.DatasetFacets datasetFacets) {
final Instant now = Instant.now();

// insert rows for known facets
Arrays.stream(DatasetFacet.values())
.collect(Collectors.toMap(DatasetFacet::getName, facet -> facet.toPgObject(datasetFacets)))
.entrySet()
.stream()
.filter(entry -> entry.getValue().isPresent())
.forEach(
entry ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
DatasetFacet.fromName(entry.getKey()).getType(),
entry.getKey(),
entry.getValue().get()));

// insert undefined facets
Optional.ofNullable(datasetFacets.getAdditionalFacets()).stream()
.map(Map::entrySet)
.flatMap(Set::stream)
.filter(entry -> Optional.ofNullable(DatasetFacet.fromName(entry.getKey())).isEmpty())
.forEach(
entry ->
insertDatasetFacet(
UUID.randomUUID(),
now,
datasetUuid,
runUuid,
lineageEventTime,
lineageEventType,
Type.UNKNOWN,
entry.getKey(),
toPgObject(Facet.asJson(entry.getKey(), entry.getValue()))));
}

record DatasetFacetRow(
UUID uuid,
Instant createdAt,
UUID datasetUuid,
UUID runUuid,
Instant lineageEventTime,
String lineageEventType,
DatasetFacetsDao.Type type,
String name,
PGobject facet) {}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/db/Facet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/
package marquez.db;

import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import marquez.common.Utils;
import org.postgresql.util.PGobject;

public interface Facet {

/** ... */
static ObjectNode asJson(@NonNull final String facetName, @NonNull Object facetValue) {
final ObjectNode facetAsJson = Utils.getMapper().createObjectNode();
facetAsJson.putPOJO(facetName, facetValue);
return facetAsJson;
}

String getName();

default Optional<PGobject> propertiesToPgObject(Map<String, Object> additionalProperties) {
return Optional.ofNullable(additionalProperties)
.filter(map -> map.containsKey(getName()))
.map(key -> toPgObject(additionalProperties.get(getName())));
}

default PGobject toPgObject(Object o) {
return Columns.toPgObject(Facet.asJson(getName(), o));
}
}
Loading

0 comments on commit a026783

Please sign in to comment.