Skip to content

Commit

Permalink
dataset symlinks provided
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Aug 26, 2022
1 parent 07ba426 commit eb67b0f
Show file tree
Hide file tree
Showing 23 changed files with 405 additions and 58 deletions.
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
NamespaceDao createNamespaceDao();

@CreateSqlObject
DatasetSymlinkDao createDatasetSymlinkDao();

@CreateSqlObject
RunDao createRunDao();

Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ private Columns() {}
public static final String FIELD_UUIDS = "field_uuids";
public static final String LIFECYCLE_STATE = "lifecycle_state";

/* DATASET SYMLINK ROW COLUMNS */
public static final String IS_PRIMARY = "is_primary";

/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

Expand Down
82 changes: 49 additions & 33 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import marquez.db.mappers.DatasetMapper;
import marquez.db.mappers.DatasetRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetSymlinkRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
Expand All @@ -42,9 +43,14 @@
@RegisterRowMapper(DatasetMapper.class)
public interface DatasetDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
"WITH symlink AS (\n"
+ " SELECT dataset_uuid\n"
+ " FROM dataset_symlinks\n"
+ " JOIN namespaces ON namespaces.uuid = dataset_symlinks.namespace_uuid\n"
+ " WHERE namespaces.name=:namespaceName and dataset_symlinks.name=:datasetName\n"
+ ") SELECT EXISTS ("
+ "SELECT 1 FROM datasets AS d "
+ "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)")
+ "WHERE d.uuid IN (SELECT dataset_uuid FROM symlink))")
boolean exists(String namespaceName, String datasetName);

@SqlBatch(
Expand All @@ -69,30 +75,37 @@ void updateLastModifiedAt(
void updateVersion(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid);

@SqlQuery(
"WITH selected_datasets AS (\n"
"WITH symlinks AS (\n"
+ " SELECT ds_joined.dataset_uuid AS dataset_uuid, ds_joined.name AS name, namespaces.name AS namespace\n"
+ " FROM dataset_symlinks ds\n"
+ " JOIN dataset_symlinks ds_joined ON ds.dataset_uuid = ds_joined.dataset_uuid\n"
+ " JOIN namespaces ON namespaces.uuid = ds_joined.namespace_uuid\n"
+ " WHERE namespaces.name=:namespaceName and ds.name=:datasetName\n"
+ "), selected_datasets AS (\n"
+ " SELECT d.*\n"
+ " FROM datasets d\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " AND d.name = :datasetName\n"
+ " INNER JOIN symlinks ON d.uuid = symlinks.dataset_uuid\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " SELECT d.uuid, dsl.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " SELECT d.uuid, dsl.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = rim.run_uuid\n"
+ " ) e ON e.run_uuid = rim.run_uuid\n"
+ ")\n"
+ "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "SELECT d.*, :datasetName as name, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n"
+ "FROM selected_datasets d\n"
+ "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
Expand All @@ -108,8 +121,9 @@ void updateLastModifiedAt(
+ " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n"
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n" // joining on name/namespace is OK,
// as the same run rows are joined
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
Expand All @@ -131,30 +145,35 @@ default void setFields(Dataset ds) {
}

@SqlQuery(
"SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName")
"""
SELECT d.*
FROM datasets AS d
JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid
WHERE dsl.name = :datasetName AND d.namespace_name = :namespaceName
""")
Optional<DatasetRow> findDatasetAsRow(String namespaceName, String datasetName);

@SqlQuery("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName")
Optional<DatasetRow> getUuid(String namespaceName, String datasetName);

@SqlQuery(
"WITH selected_datasets AS (\n"
+ " SELECT d.*\n"
+ " SELECT d.*, dsl.name\n"
+ " FROM datasets d\n"
+ " INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ " WHERE d.namespace_name = :namespaceName\n"
+ " ORDER BY d.name\n"
+ " ORDER BY dsl.name\n"
+ " LIMIT :limit OFFSET :offset\n"
+ "), dataset_runs AS (\n"
+ " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " SELECT d.uuid, dsl.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, event_time, event FROM lineage_events\n"
+ " WHERE run_uuid = dv.run_uuid\n"
+ " ) e ON e.run_uuid = dv.run_uuid\n"
+ " UNION\n"
+ " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " SELECT d.uuid, dsl.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n"
+ " FROM selected_datasets d\n"
+ " INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n"
+ " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n"
+ " LEFT JOIN LATERAL (\n"
Expand All @@ -179,7 +198,8 @@ default void setFields(Dataset ds) {
+ " WHERE d2.run_uuid = d2.run_uuid\n"
+ " AND ds -> 'facets' IS NOT NULL\n"
+ " AND ds ->> 'name' = d2.name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n"
+ " AND ds ->> 'namespace' = d2.namespace_name\n" // joining on name/namespace is OK,
// as the same run rows are joined
+ " GROUP BY d2.uuid\n"
+ ") f ON f.dataset_uuid = d.uuid\n"
+ "ORDER BY d.name")
Expand All @@ -206,7 +226,6 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ "namespace_name, "
+ "source_uuid, "
+ "source_name, "
+ "name, "
+ "physical_name, "
+ "description, "
+ "is_deleted "
Expand All @@ -219,11 +238,10 @@ default List<Dataset> findAllWithTags(String namespaceName, int limit, int offse
+ ":namespaceName, "
+ ":sourceUuid, "
+ ":sourceName, "
+ ":name, "
+ ":physicalName, "
+ ":description, "
+ ":isDeleted) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -239,7 +257,6 @@ DatasetRow upsert(
String namespaceName,
UUID sourceUuid,
String sourceName,
String name,
String physicalName,
String description,
boolean isDeleted);
Expand All @@ -254,7 +271,6 @@ DatasetRow upsert(
+ "namespace_name, "
+ "source_uuid, "
+ "source_name, "
+ "name, "
+ "physical_name "
+ ") VALUES ( "
+ ":uuid, "
Expand All @@ -265,9 +281,8 @@ DatasetRow upsert(
+ ":namespaceName, "
+ ":sourceUuid, "
+ ":sourceName, "
+ ":name, "
+ ":physicalName) "
+ "ON CONFLICT (namespace_uuid, name) "
+ "ON CONFLICT (uuid) "
+ "DO UPDATE SET "
+ "type = EXCLUDED.type, "
+ "updated_at = EXCLUDED.updated_at, "
Expand All @@ -281,7 +296,6 @@ DatasetRow upsert(
String namespaceName,
UUID sourceUuid,
String sourceName,
String name,
String physicalName);

@Transaction
Expand All @@ -292,6 +306,10 @@ default Dataset upsertDatasetMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), now, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
DatasetSymlinkRow symlinkRow =
createDatasetSymlinkDao()
.upsertDatasetSymlinkRow(
UUID.randomUUID(), datasetName.getValue(), namespaceRow.getUuid(), true, now);
SourceRow sourceRow =
createSourceDao()
.upsertOrDefault(
Expand All @@ -300,38 +318,36 @@ default Dataset upsertDatasetMeta(
now,
datasetMeta.getSourceName().getValue(),
"");
UUID newDatasetUuid = UUID.randomUUID();
DatasetRow datasetRow;

if (datasetMeta.getDescription().isPresent()) {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
namespaceRow.getName(),
sourceRow.getUuid(),
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue(),
datasetMeta.getDescription().orElse(null),
false);
} else {
datasetRow =
upsert(
newDatasetUuid,
symlinkRow.getUuid(),
datasetMeta.getType(),
now,
namespaceRow.getUuid(),
namespaceRow.getName(),
sourceRow.getUuid(),
sourceRow.getName(),
datasetName.getValue(),
datasetMeta.getPhysicalName().getValue());
}

updateDatasetMetric(namespaceName, datasetMeta.getType(), newDatasetUuid, datasetRow.getUuid());
updateDatasetMetric(
namespaceName, datasetMeta.getType(), symlinkRow.getUuid(), datasetRow.getUuid());

TagDao tagDao = createTagDao();
List<DatasetTagMapping> datasetTagMappings = new ArrayList<>();
Expand Down
13 changes: 9 additions & 4 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,23 @@
@RegisterRowMapper(DatasetFieldMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS ("
"WITH symlinks AS (\n"
+ " SELECT ds_joined.dataset_uuid AS uuid, ds_joined.name AS name, namespaces.name AS namespace\n"
+ " FROM dataset_symlinks ds\n"
+ " JOIN dataset_symlinks ds_joined ON ds.dataset_uuid = ds_joined.dataset_uuid\n"
+ " JOIN namespaces ON namespaces.uuid = ds_joined.namespace_uuid\n"
+ " WHERE namespaces.name=:namespaceName and ds.name=:datasetName\n"
+ ") SELECT EXISTS ("
+ "SELECT 1 FROM dataset_fields AS df "
+ "INNER JOIN datasets AS d "
+ " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName "
+ "INNER JOIN datasets AS d ON d.uuid IN (SELECT dataset_uuid FROM symlinks) "
+ "WHERE df.name = :name)")
boolean exists(String namespaceName, String datasetName, String name);

default Dataset updateTags(
String namespaceName, String datasetName, String fieldName, String tagName) {
Instant now = Instant.now();
TagRow tag = createTagDao().upsert(UUID.randomUUID(), now, tagName);
DatasetRow datasetRow = createDatasetDao().getUuid(namespaceName, datasetName).get();
DatasetRow datasetRow = createDatasetDao().findDatasetAsRow(namespaceName, datasetName).get();
UUID fieldUuid = createDatasetFieldDao().findUuid(datasetRow.getUuid(), fieldName).get();
datasetRow
.getCurrentVersionUuid()
Expand Down
48 changes: 48 additions & 0 deletions api/src/main/java/marquez/db/DatasetSymlinkDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import marquez.db.mappers.DatasetSymlinksRowMapper;
import marquez.db.models.DatasetSymlinkRow;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

@RegisterRowMapper(DatasetSymlinksRowMapper.class)
public interface DatasetSymlinkDao extends BaseDao {

default DatasetSymlinkRow upsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, Instant now) {
doUpsertDatasetSymlinkRow(uuid, name, namespaceUuid, isPrimary, now);
return findDatasetSymlinkByNamespaceUidAndName(namespaceUuid, name).orElseThrow();
}

@SqlQuery("SELECT * FROM dataset_symlinks WHERE namespace_uuid = :namespaceUuid and name = :name")
Optional<DatasetSymlinkRow> findDatasetSymlinkByNamespaceUidAndName(
UUID namespaceUuid, String name);

@SqlUpdate(
"INSERT INTO dataset_symlinks ("
+ "dataset_uuid, "
+ "name, "
+ "namespace_uuid, "
+ "is_primary, "
+ "created_at, "
+ "updated_at"
+ ") VALUES ( "
+ ":uuid, "
+ ":name, "
+ ":namespaceUuid, "
+ ":isPrimary, "
+ ":now, "
+ ":now) "
+ "ON CONFLICT (name, namespace_uuid) DO NOTHING")
void doUpsertDatasetSymlinkRow(
UUID uuid, String name, UUID namespaceUuid, boolean isPrimary, Instant now);
}
9 changes: 6 additions & 3 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,12 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ "SELECT d.type, dsl.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
+ "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n"
+ "INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
Expand Down Expand Up @@ -214,11 +215,12 @@ default void updateDatasetVersionMetric(
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ "SELECT d.type, dsl.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
+ "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n"
+ "INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
Expand Down Expand Up @@ -285,11 +287,12 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {
+ " FROM selected_dataset_version_runs dv\n"
+ " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n"
+ ")\n"
+ "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\n"
+ "SELECT d.type, dsl.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\n"
+ " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n"
+ " t.tags, f.facets\n"
+ "FROM selected_dataset_versions dv\n"
+ "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n"
+ "INNER JOIN dataset_symlinks dsl ON d.uuid = dsl.dataset_uuid AND dsl.is_primary=true\n"
+ "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n"
+ "LEFT JOIN (\n"
+ " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n"
Expand Down
Loading

0 comments on commit eb67b0f

Please sign in to comment.