diff --git a/api/src/main/java/marquez/api/DatasetResource.java b/api/src/main/java/marquez/api/DatasetResource.java index accfdaa044..841d4770b6 100644 --- a/api/src/main/java/marquez/api/DatasetResource.java +++ b/api/src/main/java/marquez/api/DatasetResource.java @@ -162,13 +162,14 @@ public Response delete( @PathParam("dataset") DatasetName datasetName) { throwIfNotExists(namespaceName); - datasetService - .softDelete(namespaceName.getValue(), datasetName.getValue()) - .orElseThrow(() -> new DatasetNotFoundException(datasetName)); Dataset dataset = datasetService .findDatasetByName(namespaceName.getValue(), datasetName.getValue()) .orElseThrow(() -> new DatasetNotFoundException(datasetName)); + + datasetService + .delete(namespaceName.getValue(), datasetName.getValue()) + .orElseThrow(() -> new DatasetNotFoundException(datasetName)); return Response.ok(dataset).build(); } diff --git a/api/src/main/java/marquez/api/JobResource.java b/api/src/main/java/marquez/api/JobResource.java index beaf40a115..199c4ac2e3 100644 --- a/api/src/main/java/marquez/api/JobResource.java +++ b/api/src/main/java/marquez/api/JobResource.java @@ -16,6 +16,7 @@ import javax.validation.Valid; import javax.validation.constraints.Min; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -160,6 +161,25 @@ public Response list( return Response.ok(new ResultsPage<>("jobs", jobs, totalCount)).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @DELETE + @Path("/namespaces/{namespace}/jobs/{job}") + @Produces(APPLICATION_JSON) + public Response delete( + @PathParam("namespace") NamespaceName namespaceName, @PathParam("job") JobName jobName) { + throwIfNotExists(namespaceName); + + Job job = + jobService + .findJobByName(namespaceName.getValue(), jobName.getValue()) + .orElseThrow(() -> new JobNotFoundException(jobName)); + + jobService.delete(namespaceName.getValue(), jobName.getValue()); + return Response.ok(job).build(); + } + @Timed @ResponseMetered @ExceptionMetered diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 9b50f1fdc9..54f8cf93b5 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -43,7 +43,7 @@ public interface DatasetDao extends BaseDao { @SqlQuery( "SELECT EXISTS (" - + "SELECT 1 FROM datasets AS d " + + "SELECT 1 FROM datasets_view AS d " + "WHERE d.name = :datasetName AND d.namespace_name = :namespaceName)") boolean exists(String namespaceName, String datasetName); @@ -69,49 +69,50 @@ void updateLastModifiedAt( void updateVersion(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid); @SqlQuery( - "WITH selected_datasets AS (\n" - + " SELECT d.*\n" - + " FROM datasets d\n" - + " WHERE d.namespace_name = :namespaceName\n" - + " AND d.name = :datasetName\n" - + "), dataset_runs AS (\n" - + " SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event\n" - + " FROM selected_datasets d\n" - + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" - + " LEFT JOIN LATERAL (\n" - + " SELECT run_uuid, event_time, event FROM lineage_events\n" - + " WHERE run_uuid = dv.run_uuid\n" - + " ) e ON e.run_uuid = dv.run_uuid\n" - + " UNION\n" - + " SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event\n" - + " FROM selected_datasets d\n" - + " INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid\n" - + " LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid\n" - + " LEFT JOIN LATERAL (\n" - + " SELECT run_uuid, event_time, event FROM lineage_events\n" - + " WHERE run_uuid = rim.run_uuid\n" - + " ) e ON e.run_uuid = rim.run_uuid\n" - + ")\n" - + "SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets\n" - + "FROM selected_datasets d\n" - + "LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid\n" - + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" - + "LEFT JOIN (\n" - + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n" - + " FROM tags AS t\n" - + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n" - + " GROUP BY m.dataset_uuid\n" - + ") t ON t.dataset_uuid = d.uuid\n" - + "LEFT JOIN (\n" - + " SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n" - + " FROM dataset_runs d2,\n" - + " jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds\n" - + " WHERE d2.run_uuid = d2.run_uuid\n" - + " AND ds -> 'facets' IS NOT NULL\n" - + " AND ds ->> 'name' = d2.name\n" - + " AND ds ->> 'namespace' = d2.namespace_name\n" - + " GROUP BY d2.uuid\n" - + ") f ON f.dataset_uuid = d.uuid") + """ + WITH selected_datasets AS ( + SELECT d.* + FROM datasets_view d + WHERE d.namespace_name = :namespaceName + AND d.name = :datasetName + ), dataset_runs AS ( + SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event + FROM selected_datasets d + INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid + LEFT JOIN LATERAL ( + SELECT run_uuid, event_time, event FROM lineage_events + WHERE run_uuid = dv.run_uuid + ) e ON e.run_uuid = dv.run_uuid + UNION + SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event + FROM selected_datasets d + INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid + LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid + LEFT JOIN LATERAL ( + SELECT run_uuid, event_time, event FROM lineage_events + WHERE run_uuid = rim.run_uuid + ) e ON e.run_uuid = rim.run_uuid + ) + SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets + FROM selected_datasets d + LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid + LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid + LEFT JOIN ( + SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid + FROM tags AS t + INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid + GROUP BY m.dataset_uuid + ) t ON t.dataset_uuid = d.uuid + LEFT JOIN ( + SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets + FROM dataset_runs d2, + jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE d2.run_uuid = d2.run_uuid + AND ds -> 'facets' IS NOT NULL + AND ds ->> 'name' = d2.name + AND ds ->> 'namespace' = d2.namespace_name + GROUP BY d2.uuid + ) f ON f.dataset_uuid = d.uuid""") Optional findDatasetByName(String namespaceName, String datasetName); default Optional findWithTags(String namespaceName, String datasetName) { @@ -131,19 +132,19 @@ default void setFields(Dataset ds) { } @SqlQuery( - "SELECT d.* FROM datasets AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName") + "SELECT d.* FROM datasets_view AS d WHERE d.name = :datasetName AND d.namespace_name = :namespaceName") Optional findDatasetAsRow(String namespaceName, String datasetName); - @SqlQuery("SELECT * FROM datasets WHERE name = :datasetName AND namespace_name = :namespaceName") + @SqlQuery( + "SELECT * FROM datasets_view WHERE name = :datasetName AND namespace_name = :namespaceName") Optional getUuid(String namespaceName, String datasetName); @SqlQuery( """ WITH selected_datasets AS ( SELECT d.* - FROM datasets d + FROM datasets_view d WHERE d.namespace_name = :namespaceName - AND d.is_deleted is false ORDER BY d.name LIMIT :limit OFFSET :offset ), dataset_runs AS ( @@ -187,10 +188,10 @@ SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS f ORDER BY d.name""") List findAll(String namespaceName, int limit, int offset); - @SqlQuery("SELECT count(*) FROM datasets") + @SqlQuery("SELECT count(*) FROM datasets_view") int count(); - @SqlQuery("SELECT count(*) FROM datasets AS j WHERE j.namespace_name = :namespaceName") + @SqlQuery("SELECT count(*) FROM datasets_view AS j WHERE j.namespace_name = :namespaceName") int countFor(String namespaceName); default List findAllWithTags(String namespaceName, int limit, int offset) { @@ -199,40 +200,45 @@ default List findAllWithTags(String namespaceName, int limit, int offse } @SqlQuery( - "INSERT INTO datasets (" - + "uuid, " - + "type, " - + "created_at, " - + "updated_at, " - + "namespace_uuid, " - + "namespace_name, " - + "source_uuid, " - + "source_name, " - + "name, " - + "physical_name, " - + "description, " - + "is_deleted " - + ") VALUES ( " - + ":uuid, " - + ":type, " - + ":now, " - + ":now, " - + ":namespaceUuid, " - + ":namespaceName, " - + ":sourceUuid, " - + ":sourceName, " - + ":name, " - + ":physicalName, " - + ":description, " - + ":isDeleted) " - + "ON CONFLICT (namespace_uuid, name) " - + "DO UPDATE SET " - + "type = EXCLUDED.type, " - + "updated_at = EXCLUDED.updated_at, " - + "physical_name = EXCLUDED.physical_name, " - + "description = EXCLUDED.description, " - + "is_deleted = EXCLUDED.is_deleted " - + "RETURNING *") + """ + INSERT INTO datasets ( + uuid, + type, + created_at, + updated_at, + namespace_uuid, + namespace_name, + source_uuid, + source_name, + name, + physical_name, + description, + is_deleted, + is_hidden + ) VALUES ( + :uuid, + :type, + :now, + :now, + :namespaceUuid, + :namespaceName, + :sourceUuid, + :sourceName, + :name, + :physicalName, + :description, + :isDeleted, + false + ) ON CONFLICT (namespace_uuid, name) + DO UPDATE SET + type = EXCLUDED.type, + updated_at = EXCLUDED.updated_at, + physical_name = EXCLUDED.physical_name, + description = EXCLUDED.description, + is_deleted = EXCLUDED.is_deleted, + is_hidden = EXCLUDED.is_hidden + RETURNING * + """) DatasetRow upsert( UUID uuid, DatasetType type, @@ -289,12 +295,12 @@ DatasetRow upsert( @SqlQuery( """ UPDATE datasets - SET is_deleted = true + SET is_hidden = true WHERE namespace_name = :namespaceName AND name = :name RETURNING * """) - Optional softDelete(String namespaceName, String name); + Optional delete(String namespaceName, String name); @Transaction default Dataset upsertDatasetMeta( diff --git a/api/src/main/java/marquez/db/DatasetFieldDao.java b/api/src/main/java/marquez/db/DatasetFieldDao.java index b76b89e94a..bc5fc86bfd 100644 --- a/api/src/main/java/marquez/db/DatasetFieldDao.java +++ b/api/src/main/java/marquez/db/DatasetFieldDao.java @@ -33,7 +33,7 @@ public interface DatasetFieldDao extends BaseDao { @SqlQuery( "SELECT EXISTS (" + "SELECT 1 FROM dataset_fields AS df " - + "INNER JOIN datasets AS d " + + "INNER JOIN datasets_view AS d " + " ON d.uuid = df.dataset_uuid AND d.name = :datasetName AND d.namespace_name = :namespaceName " + "WHERE df.name = :name)") boolean exists(String namespaceName, String datasetName, String name); diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index bb1900d2c7..3333ceaba0 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -155,87 +155,89 @@ default void updateDatasetVersionMetric( String SELECT = "SELECT dv.* " + "FROM dataset_versions dv "; @SqlQuery( - "WITH selected_dataset_versions AS (\n" - + " SELECT dv.*\n" - + " FROM dataset_versions dv\n" - + " WHERE dv.version = :version\n" - + "), selected_dataset_version_runs AS (\n" - + " SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid\n" - + " FROM selected_dataset_versions\n" - + " UNION\n" - + " SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid\n" - + " FROM selected_dataset_versions dv\n" - + " LEFT JOIN runs_input_mapping rim\n" - + " ON rim.dataset_version_uuid = dv.uuid\n" - + "), selected_dataset_version_events AS (\n" - + " SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event\n" - + " FROM selected_dataset_version_runs dv\n" - + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" - + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n" - + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" - + " t.tags, f.facets\n" - + "FROM selected_dataset_versions dv\n" - + "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n" - + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" - + "LEFT JOIN (\n" - + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n" - + " FROM tags AS t\n" - + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n" - + " GROUP BY m.dataset_uuid\n" - + ") t ON t.dataset_uuid = dv.dataset_uuid\n" - + "LEFT JOIN (\n" - + " SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n" - + " FROM selected_dataset_version_events dve,\n" - + " jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds\n" - + " WHERE dve.run_uuid = dve.run_uuid\n" - + " AND ds -> 'facets' IS NOT NULL\n" - + " AND ds ->> 'name' = dve.dataset_name\n" - + " AND ds ->> 'namespace' = dve.namespace_name\n" - + " GROUP BY dve.uuid\n" - + ") f ON f.dataset_uuid = dv.uuid") + """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.version = :version + ), selected_dataset_version_runs AS ( + SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid + FROM selected_dataset_versions + UNION + SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid + FROM selected_dataset_versions dv + LEFT JOIN runs_input_mapping rim + ON rim.dataset_version_uuid = dv.uuid + ), selected_dataset_version_events AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + FROM selected_dataset_version_runs dv + LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + ) + SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s + dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, + t.tags, f.facets + FROM selected_dataset_versions dv + LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid + LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid + LEFT JOIN ( + SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid + FROM tags AS t + INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid + GROUP BY m.dataset_uuid + ) t ON t.dataset_uuid = dv.dataset_uuid + LEFT JOIN ( + SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets + FROM selected_dataset_version_events dve, + jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE dve.run_uuid = dve.run_uuid + AND ds -> 'facets' IS NOT NULL + AND ds ->> 'name' = dve.dataset_name + AND ds ->> 'namespace' = dve.namespace_name + GROUP BY dve.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findBy(UUID version); @SqlQuery( - "WITH selected_dataset_versions AS (\n" - + " SELECT dv.*\n" - + " FROM dataset_versions dv\n" - + " WHERE dv.uuid = :uuid\n" - + "), selected_dataset_version_runs AS (\n" - + " SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid\n" - + " FROM selected_dataset_versions\n" - + " UNION\n" - + " SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid\n" - + " FROM selected_dataset_versions dv\n" - + " LEFT JOIN runs_input_mapping rim\n" - + " ON rim.dataset_version_uuid = dv.uuid\n" - + "), selected_dataset_version_events AS (\n" - + " SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event\n" - + " FROM selected_dataset_version_runs dv\n" - + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" - + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, \n" - + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" - + " t.tags, f.facets\n" - + "FROM selected_dataset_versions dv\n" - + "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n" - + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" - + "LEFT JOIN (\n" - + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n" - + " FROM tags AS t\n" - + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n" - + " GROUP BY m.dataset_uuid\n" - + ") t ON t.dataset_uuid = dv.dataset_uuid\n" - + "LEFT JOIN (\n" - + " SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n" - + " FROM selected_dataset_version_events dve,\n" - + " jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds\n" - + " WHERE dve.run_uuid = dve.run_uuid\n" - + " AND ds -> 'facets' IS NOT NULL\n" - + " AND ds ->> 'name' = dve.dataset_name\n" - + " AND ds ->> 'namespace' = dve.namespace_name\n" - + " GROUP BY dve.uuid\n" - + ") f ON f.dataset_uuid = dv.uuid") + """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.uuid = :uuid + ), selected_dataset_version_runs AS ( + SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid + FROM selected_dataset_versions + UNION + SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid + FROM selected_dataset_versions dv + LEFT JOIN runs_input_mapping rim + ON rim.dataset_version_uuid = dv.uuid + ), selected_dataset_version_events AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + FROM selected_dataset_version_runs dv + LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + ) + SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s + dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, + t.tags, f.facets + FROM selected_dataset_versions dv + LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid + LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid + LEFT JOIN ( + SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid + FROM tags AS t + INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid + GROUP BY m.dataset_uuid + ) t ON t.dataset_uuid = dv.dataset_uuid + LEFT JOIN ( + SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets + FROM selected_dataset_version_events dve, + jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE dve.run_uuid = dve.run_uuid + AND ds -> 'facets' IS NOT NULL + AND ds ->> 'name' = dve.dataset_name + AND ds ->> 'namespace' = dve.namespace_name + GROUP BY dve.uuid + ) f ON f.dataset_uuid = dv.uuid""") Optional findByUuid(UUID uuid); default Optional findByWithRun(UUID version) { @@ -265,49 +267,50 @@ default Optional findByWithRun(UUID version) { List findOutputDatasetVersionsFor(@NonNull UUID runId); @SqlQuery( - "WITH selected_dataset_versions AS (\n" - + " SELECT dv.*\n" - + " FROM dataset_versions dv\n" - + " WHERE dv.namespace_name = :namespaceName\n" - + " AND dv.dataset_name = :datasetName\n" - + " ORDER BY dv.created_at DESC\n" - + " LIMIT :limit OFFSET :offset\n" - + "), selected_dataset_version_runs AS (\n" - + " SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid\n" - + " FROM selected_dataset_versions\n" - + " UNION\n" - + " SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid\n" - + " FROM selected_dataset_versions dv\n" - + " LEFT JOIN runs_input_mapping rim\n" - + " ON rim.dataset_version_uuid = dv.uuid\n" - + "), selected_dataset_version_events AS (\n" - + " SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event\n" - + " FROM selected_dataset_version_runs dv\n" - + " LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid\n" - + ")\n" - + "SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\n" - + " dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,\n" - + " t.tags, f.facets\n" - + "FROM selected_dataset_versions dv\n" - + "LEFT JOIN datasets d ON d.uuid = dv.dataset_uuid\n" - + "LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid\n" - + "LEFT JOIN (\n" - + " SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid\n" - + " FROM tags AS t\n" - + " INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid\n" - + " GROUP BY m.dataset_uuid\n" - + ") t ON t.dataset_uuid = dv.dataset_uuid\n" - + "LEFT JOIN (\n" - + " SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets\n" - + " FROM selected_dataset_version_events dve,\n" - + " jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds\n" - + " WHERE dve.run_uuid = dve.run_uuid\n" - + " AND ds -> 'facets' IS NOT NULL\n" - + " AND ds ->> 'name' = dve.dataset_name\n" - + " AND ds ->> 'namespace' = dve.namespace_name\n" - + " GROUP BY dve.uuid\n" - + ") f ON f.dataset_uuid = dv.uuid\n" - + "ORDER BY dv.created_at DESC") + """ + WITH selected_dataset_versions AS ( + SELECT dv.* + FROM dataset_versions dv + WHERE dv.namespace_name = :namespaceName + AND dv.dataset_name = :datasetName + ORDER BY dv.created_at DESC + LIMIT :limit OFFSET :offset + ), selected_dataset_version_runs AS ( + SELECT uuid, dataset_uuid, namespace_name, dataset_name, version, created_at, run_uuid + FROM selected_dataset_versions + UNION + SELECT DISTINCT dv.uuid, dv.dataset_uuid, dv.namespace_name, dv.dataset_name, dv.version, dv.created_at, rim.run_uuid + FROM selected_dataset_versions dv + LEFT JOIN runs_input_mapping rim + ON rim.dataset_version_uuid = dv.uuid + ), selected_dataset_version_events AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + FROM selected_dataset_version_runs dv + LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + ) + SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, + dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, + t.tags, f.facets + FROM selected_dataset_versions dv + LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid + LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid + LEFT JOIN ( + SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid + FROM tags AS t + INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid + GROUP BY m.dataset_uuid + ) t ON t.dataset_uuid = dv.dataset_uuid + LEFT JOIN ( + SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets + FROM selected_dataset_version_events dve, + jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE dve.run_uuid = dve.run_uuid + AND ds -> 'facets' IS NOT NULL + AND ds ->> 'name' = dve.dataset_name + AND ds ->> 'namespace' = dve.namespace_name + GROUP BY dve.uuid + ) f ON f.dataset_uuid = dv.uuid + ORDER BY dv.created_at DESC""") List findAll(String namespaceName, String datasetName, int limit, int offset); default List findAllWithRun( diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 63ba36581c..d0e4981353 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -74,6 +74,15 @@ SELECT run_uuid, JSON_AGG(e.facets) AS facets """) Optional findJobByName(String namespaceName, String jobName); + @SqlUpdate( + """ + UPDATE jobs + SET is_hidden = true + WHERE namespace_name = :namespaceName + AND name = :name + """) + void delete(String namespaceName, String name); + default Optional findWithRun(String namespaceName, String jobName) { Optional job = findJobByName(namespaceName, jobName); job.ifPresent( diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index ffe25708f6..dc79c34ef6 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -52,79 +52,81 @@ enum IoType { * Output datasets are constructed as JSON strings that can be deserialized into DatasetIds. */ String BASE_SELECT_ON_JOB_VERSIONS = - "WITH job_version_io AS (\n" - + " SELECT io.job_version_uuid,\n" - + " JSON_AGG(json_build_object('namespace', ds.namespace_name,\n" - + " 'name', ds.name))\n" - + " FILTER (WHERE io.io_type = 'INPUT') AS input_datasets,\n" - + " JSON_AGG(json_build_object('namespace', ds.namespace_name,\n" - + " 'name', ds.name))\n" - + " FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets\n" - + " FROM job_versions_io_mapping io\n" - + " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n" - + " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n" - + " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n" - + " WHERE j.namespace_name = :namespaceName\n" - + " AND j.name = :jobName\n" - + " GROUP BY io.job_version_uuid\n" - + "), relevant_job_versions AS (\n" - + " SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, \n" - + " jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, \n" - + " j.namespace_name, j.name AS job_name, jc.context\n" - + " FROM job_versions jv\n" - + " LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid\n" - + " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n" - + " WHERE j.name = :jobName AND j.namespace_name=:namespaceName\n" - + " ORDER BY jv.created_at DESC\n" - + ")\n" - + "SELECT jv.*,\n" - + " dsio.input_datasets,\n" - + " dsio.output_datasets,\n" - + " r.uuid AS run_uuid,\n" - + " r.created_at AS run_created_at,\n" - + " r.updated_at AS run_updated_at,\n" - + " r.nominal_start_time AS run_nominal_start_time,\n" - + " r.nominal_end_time AS run_nominal_end_time,\n" - + " r.current_run_state AS run_current_run_state,\n" - + " r.started_at AS run_started_at,\n" - + " r.ended_at AS run_ended_at,\n" - + " r.namespace_name AS run_namespace_name,\n" - + " r.job_name AS run_job_name,\n" - + " jv.version AS run_job_version,\n" - + " r.location AS run_location,\n" - + " ra.args AS run_args,\n" - + " jv.context AS run_context,\n" - + " f.facets AS run_facets,\n" - + " ri.input_versions AS run_input_versions,\n" - + " ro.output_versions AS run_output_versions\n" - + "FROM relevant_job_versions AS jv\n" - + "LEFT JOIN job_version_io dsio ON dsio.job_version_uuid = jv.uuid\n" - + "LEFT OUTER JOIN runs r ON r.uuid = jv.latest_run_uuid\n" - + "LEFT JOIN LATERAL (\n" - + " SELECT le.run_uuid, JSON_AGG(event -> 'run' -> 'facets') AS facets\n" - + " FROM lineage_events le\n" - + " WHERE le.run_uuid=jv.latest_run_uuid\n" - + " GROUP BY le.run_uuid\n" - + ") AS f ON r.uuid = f.run_uuid\n" - + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" - + "LEFT JOIN LATERAL (\n" - + " SELECT im.run_uuid,\n" - + " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" - + " 'name', dv.dataset_name,\n" - + " 'version', dv.version)) AS input_versions\n" - + " FROM runs_input_mapping im\n" - + " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n" - + " WHERE im.run_uuid=jv.latest_run_uuid\n" - + " GROUP BY im.run_uuid\n" - + ") ri ON ri.run_uuid = r.uuid\n" - + "LEFT OUTER JOIN (\n" - + " SELECT run_uuid,\n" - + " JSON_AGG(json_build_object('namespace', namespace_name,\n" - + " 'name', dataset_name,\n" - + " 'version', version)) AS output_versions\n" - + " FROM dataset_versions\n" - + " GROUP BY run_uuid\n" - + ") ro ON ro.run_uuid = r.uuid\n"; + """ + WITH job_version_io AS ( + SELECT io.job_version_uuid, + JSON_AGG(json_build_object('namespace', ds.namespace_name, + 'name', ds.name)) + FILTER (WHERE io.io_type = 'INPUT') AS input_datasets, + JSON_AGG(json_build_object('namespace', ds.namespace_name, + 'name', ds.name)) + FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets + FROM job_versions_io_mapping io + INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid + INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid + INNER JOIN jobs_view j ON j.uuid=jv.job_uuid + WHERE j.namespace_name = :namespaceName + AND j.name = :jobName + GROUP BY io.job_version_uuid + ), relevant_job_versions AS ( + SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version,\s + jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid,\s + j.namespace_name, j.name AS job_name, jc.context + FROM job_versions jv + LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid + INNER JOIN jobs_view j ON j.uuid=jv.job_uuid + WHERE j.name = :jobName AND j.namespace_name=:namespaceName + ORDER BY jv.created_at DESC + ) + SELECT jv.*, + dsio.input_datasets, + dsio.output_datasets, + r.uuid AS run_uuid, + r.created_at AS run_created_at, + r.updated_at AS run_updated_at, + r.nominal_start_time AS run_nominal_start_time, + r.nominal_end_time AS run_nominal_end_time, + r.current_run_state AS run_current_run_state, + r.started_at AS run_started_at, + r.ended_at AS run_ended_at, + r.namespace_name AS run_namespace_name, + r.job_name AS run_job_name, + jv.version AS run_job_version, + r.location AS run_location, + ra.args AS run_args, + jv.context AS run_context, + f.facets AS run_facets, + ri.input_versions AS run_input_versions, + ro.output_versions AS run_output_versions + FROM relevant_job_versions AS jv + LEFT JOIN job_version_io dsio ON dsio.job_version_uuid = jv.uuid + LEFT OUTER JOIN runs r ON r.uuid = jv.latest_run_uuid + LEFT JOIN LATERAL ( + SELECT le.run_uuid, JSON_AGG(event -> 'run' -> 'facets') AS facets + FROM lineage_events le + WHERE le.run_uuid=jv.latest_run_uuid + GROUP BY le.run_uuid + ) AS f ON r.uuid = f.run_uuid + LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid + LEFT JOIN LATERAL ( + SELECT im.run_uuid, + JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version)) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid + WHERE im.run_uuid=jv.latest_run_uuid + GROUP BY im.run_uuid + ) ri ON ri.run_uuid = r.uuid + LEFT OUTER JOIN ( + SELECT run_uuid, + JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version)) AS output_versions + FROM dataset_versions + GROUP BY run_uuid + ) ro ON ro.run_uuid = r.uuid + """; @SqlQuery(BASE_SELECT_ON_JOB_VERSIONS + "WHERE jv.version = :jobVersionUuid") Optional findJobVersion(String namespaceName, String jobName, UUID jobVersionUuid); diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 18b1fa1b27..d48e9f394e 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -74,22 +74,21 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, @SqlQuery( """ - SELECT ds.*, dv.fields, dv.lifecycle_state - FROM datasets ds - LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid - WHERE ds.uuid IN () - AND ds.is_deleted is false - """) - Set getNonDeletedDatasetData(@BindList Set dsUuids); + SELECT ds.*, dv.fields, dv.lifecycle_state + FROM datasets_view ds + LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid + WHERE ds.uuid IN ()""") + Set getDatasetData(@BindList Set dsUuids); @SqlQuery( - "select j.uuid from jobs j\n" - + "inner join job_versions jv on jv.job_uuid = j.uuid\n" - + "inner join job_versions_io_mapping io on io.job_version_uuid = jv.uuid\n" - + "inner join datasets ds on ds.uuid = io.dataset_uuid\n" - + "where ds.name = :datasetName and ds.namespace_name = :namespaceName\n" - + "order by io_type DESC, jv.created_at DESC\n" - + "limit 1") + """ + SELECT j.uuid FROM jobs j + INNER JOIN job_versions jv ON jv.job_uuid = j.uuid + INNER JOIN job_versions_io_mapping io ON io.job_version_uuid = jv.uuid + INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid + WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName + ORDER BY io_type DESC, jv.created_at DESC + LIMIT 1""") Optional getJobFromInputOrOutput(String datasetName, String namespaceName); @SqlQuery( diff --git a/api/src/main/java/marquez/db/SearchDao.java b/api/src/main/java/marquez/db/SearchDao.java index ddd58a57b4..46a089209a 100644 --- a/api/src/main/java/marquez/db/SearchDao.java +++ b/api/src/main/java/marquez/db/SearchDao.java @@ -27,22 +27,23 @@ public interface SearchDao { * @return A {@link SearchResult} object. */ @SqlQuery( - "SELECT type, name, updated_at, namespace_name\n" - + "FROM (\n" - + " SELECT 'DATASET' AS type, d.name, d.updated_at, d.namespace_name\n" - + " FROM datasets AS d\n" - + " WHERE d.name ilike '%' || :query || '%'\n" - + " UNION\n" - + " SELECT DISTINCT ON (j.namespace_name, j.name) \n" - + " 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n" - + " FROM (SELECT namespace_name, name, unnest(COALESCE(aliases, Array[NULL]::varchar[])) AS alias, updated_at \n" - + " FROM jobs_view WHERE symlink_target_uuid IS NULL\n" - + " ORDER BY updated_at DESC) AS j\n" - + " WHERE j.name ilike '%' || :query || '%'\n" - + " OR j.alias ilike '%' || :query || '%'\n" - + ") AS results\n" - + "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n" - + "ORDER BY :sort\n" - + "LIMIT :limit") + """ + SELECT type, name, updated_at, namespace_name + FROM ( + SELECT 'DATASET' AS type, d.name, d.updated_at, d.namespace_name + FROM datasets_view AS d + WHERE d.name ilike '%' || :query || '%' + UNION + SELECT DISTINCT ON (j.namespace_name, j.name)\s + 'JOB' AS type, j.name, j.updated_at, j.namespace_name + FROM (SELECT namespace_name, name, unnest(COALESCE(aliases, Array[NULL]::varchar[])) AS alias, updated_at\s + FROM jobs_view WHERE symlink_target_uuid IS NULL + ORDER BY updated_at DESC) AS j + WHERE j.name ilike '%' || :query || '%' + OR j.alias ilike '%' || :query || '%' + ) AS results + WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL + ORDER BY :sort + LIMIT :limit""") List search(String query, SearchFilter filter, SearchSort sort, int limit); } diff --git a/api/src/main/java/marquez/graphql/GraphqlDaos.java b/api/src/main/java/marquez/graphql/GraphqlDaos.java index ffee5c62d7..fac3d87bb6 100644 --- a/api/src/main/java/marquez/graphql/GraphqlDaos.java +++ b/api/src/main/java/marquez/graphql/GraphqlDaos.java @@ -23,20 +23,20 @@ public interface GraphqlDaos extends SqlObject { * Note: Use must use a non-map type for returning single entries because a type of Map is already * registered to jdbi. */ - @SqlQuery("SELECT * FROM datasets WHERE uuid = :uuid ORDER BY updated_at") + @SqlQuery("SELECT * FROM datasets_view WHERE uuid = :uuid ORDER BY updated_at") RowMap getDataset(UUID uuid); - @SqlQuery("SELECT * FROM datasets") + @SqlQuery("SELECT * FROM datasets_view") List> getDatasets(); @SqlQuery( - "SELECT * FROM datasets where namespace_name = :namespaceName and datasets.name = :name") + "SELECT * FROM datasets_view where namespace_name = :namespaceName and datasets.name = :name") RowMap getDatasetByNamespaceAndName(String namespaceName, String name); @SqlQuery("SELECT * FROM jobs_view where namespace_name = :namespaceName and name = :name") RowMap getJobByNamespaceAndName(String namespaceName, String name); - @SqlQuery("SELECT * FROM datasets where namespace_name = :namespaceName and name = :name") + @SqlQuery("SELECT * FROM datasets_view where namespace_name = :namespaceName and name = :name") RowMap getDatasetsByNamespaceAndName(String namespaceName, String name); @SqlQuery("SELECT * FROM jobs_view") @@ -56,10 +56,10 @@ public interface GraphqlDaos extends SqlObject { List> getDatasetFieldsByTagUuid(UUID tagUuid); @SqlQuery( - "SELECT d.* FROM datasets d inner join datasets_tag_mapping m on m.dataset_uuid = d.uuid where tag_uuid = :uuid") + "SELECT d.* FROM datasets_view d inner join datasets_tag_mapping m on m.dataset_uuid = d.uuid where tag_uuid = :uuid") List> getDatasetsByTagUuid(UUID tagUuid); - @SqlQuery("SELECT d.* from datasets d where source_uuid = :sourceUuid") + @SqlQuery("SELECT d.* from datasets_view d where source_uuid = :sourceUuid") List> getDatasetsBySource(UUID sourceUuid); @SqlQuery("SELECT * from runs_view where uuid = :uuid") @@ -108,11 +108,11 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery("SELECT * from job_contexts where uuid = :uuid") RowMap getJobContext(UUID uuid); - @SqlQuery("SELECT * from datasets where namespace_uuid = :namespaceUuid") + @SqlQuery("SELECT * from datasets_view where namespace_uuid = :namespaceUuid") List> getDatasetsByNamespace(UUID namespaceUuid); @SqlQuery( - "SELECT d.* from datasets d inner join job_versions_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType") + "SELECT d.* from datasets_view d inner join job_versions_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType") List> getIOMappingByJobVersion(UUID jobVersionUuid, IoType ioType); @SqlQuery( @@ -218,7 +218,7 @@ WITH RECURSIVE search_graph(job_name, namespace_name, depth, path, cycle) AS ( left outer join ( select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg from job_versions_io_mapping io_out - inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid + inner join datasets_view ds_in on ds_in.uuid = io_out.dataset_uuid -- output jobs for each input dataset left outer join ( select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg @@ -244,7 +244,7 @@ select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j left outer join( select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg from job_versions_io_mapping io_out - inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid + inner join datasets_view ds_in on ds_in.uuid = io_out.dataset_uuid -- output jobs for each output dataset left outer join ( select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 77878e14d9..32b0efb6b0 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -75,7 +75,7 @@ public Lineage lineage(NodeId nodeId, int depth) { .collect(Collectors.toSet()); Set datasets = new HashSet<>(); if (!datasetIds.isEmpty()) { - datasets.addAll(this.getNonDeletedDatasetData(datasetIds)); + datasets.addAll(this.getDatasetData(datasetIds)); } return toLineage(jobData, datasets); @@ -97,9 +97,15 @@ private Lineage toLineage(Set jobData, Set datasets) { continue; } Set inputs = - data.getInputUuids().stream().map(datasetById::get).collect(Collectors.toSet()); + data.getInputUuids().stream() + .map(datasetById::get) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); Set outputs = - data.getOutputUuids().stream().map(datasetById::get).collect(Collectors.toSet()); + data.getOutputUuids().stream() + .map(datasetById::get) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); data.setInputs(buildDatasetId(inputs)); data.setOutputs(buildDatasetId(outputs)); diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index 1da2122655..69a56c51b0 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -20,7 +20,8 @@ SELECT f.uuid, f.aliases FROM jobs_fqn f, jobs j -WHERE j.uuid = f.uuid; +WHERE j.uuid = f.uuid +AND j.is_hidden IS FALSE; CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS diff --git a/api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql b/api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql new file mode 100644 index 0000000000..d33bd843b3 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/R__3_Datasets_view.sql @@ -0,0 +1,18 @@ +CREATE OR REPLACE VIEW datasets_view +AS +SELECT uuid, + type, + created_at, + updated_at, + namespace_uuid, + source_uuid, + name, + physical_name, + description, + current_version_uuid, + last_modified_at, + namespace_name, + source_name, + is_deleted +FROM datasets +WHERE is_hidden IS FALSE; \ No newline at end of file diff --git a/api/src/main/resources/marquez/db/migration/V46__add_hidden_column_to_jobs_and_datasets.sql b/api/src/main/resources/marquez/db/migration/V46__add_hidden_column_to_jobs_and_datasets.sql new file mode 100644 index 0000000000..0311279044 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V46__add_hidden_column_to_jobs_and_datasets.sql @@ -0,0 +1,3 @@ +ALTER TABLE datasets ADD COLUMN is_hidden BOOLEAN DEFAULT FALSE; + +ALTER TABLE jobs ADD COLUMN is_hidden BOOLEAN DEFAULT FALSE; \ No newline at end of file diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index 00611cf067..4c13672620 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -51,6 +51,7 @@ import marquez.client.models.Tag; import marquez.common.models.SourceType; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.testcontainers.junit.jupiter.Container; @@ -268,4 +269,15 @@ protected static Field newFieldWith(final String tag) { protected static Field newFieldWith(final ImmutableSet tags) { return new Field(newFieldName().getValue(), newFieldType(), tags, newDescription()); } + + protected CompletableFuture sendEvent(T event) { + return this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + } } diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index 11abd5708d..e8603c78b3 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -399,24 +399,24 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep String namespace = "namespace"; String name = "anotherTable"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + new LineageEvent( + "COMPLETE", + Instant.now().atZone(ZoneId.systemDefault()), + new LineageEvent.Run(UUID.randomUUID().toString(), null), + new LineageEvent.Job("namespace", "job_name", null), + List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), + Collections.emptyList(), + "the_producer"); CompletableFuture resp = - this.sendLineage(Utils.toJson(event)) - .thenApply(HttpResponse::statusCode) - .whenComplete( - (val, error) -> { - if (error != null) { - Assertions.fail("Could not complete request"); - } - }); + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); // Ensure the event was correctly rejected and a proper response code returned. assertThat(resp.join()).isEqualTo(201); @@ -425,19 +425,19 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep List datasets = client.listDatasets(namespace); assertThat(datasets).hasSize(0); - resp = this.sendLineage(Utils.toJson(event)) - .thenApply(HttpResponse::statusCode) - .whenComplete( - (val, error) -> { - if (error != null) { - Assertions.fail("Could not complete request"); - } - }); + resp = + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); assertThat(resp.join()).isEqualTo(201); datasets = client.listDatasets(namespace); assertThat(datasets).hasSize(1); } - } diff --git a/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java b/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java index 7abbe3ae17..bb3b910f03 100644 --- a/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java +++ b/api/src/test/java/marquez/common/api/JobResourceIntegrationTest.java @@ -240,6 +240,19 @@ public void testApp_getJobVersion() { .isEqualTo(runId); } + @Test + public void testApp_deleteJob() { + client.createJob(NAMESPACE_NAME, JOB_NAME, JOB_META); + + List jobs = client.listJobs(NAMESPACE_NAME); + assertThat(jobs).hasSizeGreaterThan(0); + + client.deleteJob(NAMESPACE_NAME, JOB_NAME); + + jobs = client.listJobs(NAMESPACE_NAME); + assertThat(jobs).isEmpty(); + } + @Test public void testApp_getJobVersionWithInputsAndOutputs() { createSource(STREAM_SOURCE_NAME); diff --git a/api/src/test/java/marquez/db/DatasetDaoTest.java b/api/src/test/java/marquez/db/DatasetDaoTest.java index ba6be1911d..c1ef2f7bfa 100644 --- a/api/src/test/java/marquez/db/DatasetDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetDaoTest.java @@ -68,6 +68,7 @@ public void tearDown(Jdbi jdbi) { handle.execute("DELETE FROM lineage_events"); handle.execute("DELETE FROM runs_input_mapping"); handle.execute("DELETE FROM dataset_versions_field_mapping"); + handle.execute("DELETE FROM stream_versions"); handle.execute("DELETE FROM dataset_versions"); handle.execute("UPDATE runs SET start_run_state_uuid=NULL, end_run_state_uuid=NULL"); handle.execute("DELETE FROM run_states"); @@ -78,6 +79,7 @@ public void tearDown(Jdbi jdbi) { handle.execute("DELETE FROM jobs"); handle.execute("DELETE FROM dataset_fields_tag_mapping"); handle.execute("DELETE FROM dataset_fields"); + handle.execute("DELETE FROM datasets_tag_mapping"); handle.execute("DELETE FROM datasets"); handle.execute("DELETE FROM sources"); handle.execute("DELETE FROM namespaces"); @@ -323,7 +325,7 @@ public void testGetDatasets() { List datasets = datasetDao.findAll(NAMESPACE, 5, 0); assertThat(datasets).hasSize(3); - datasetDao.softDelete(NAMESPACE, deletedDatasetName); + datasetDao.delete(NAMESPACE, deletedDatasetName); datasets = datasetDao.findAll(NAMESPACE, 5, 0); assertThat(datasets).hasSize(2); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 8e370ff002..07b2b9ee7c 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -666,7 +666,7 @@ public void testGetDatasetData() { jobFacet, dataset); Set datasetData = - lineageDao.getNonDeletedDatasetData( + lineageDao.getDatasetData( newRows.stream() .map(j -> j.getOutput().get().getDatasetRow().getUuid()) .collect(Collectors.toSet())); @@ -697,7 +697,7 @@ public void testGetDatasetDatalifecycleStateReturned() { Arrays.asList(dataset)); Set datasetData = - lineageDao.getNonDeletedDatasetData( + lineageDao.getDatasetData( Collections.singleton(row.getOutputs().get().get(0).getDatasetRow().getUuid())); assertThat(datasetData) @@ -736,7 +736,7 @@ public void testGetDatasetDataDoesNotReturnDeletedDataset() { Arrays.asList(dataset, toDelete)); Set datasetData = - lineageDao.getNonDeletedDatasetData( + lineageDao.getDatasetData( Set.of( row.getOutputs().get().get(0).getDatasetRow().getUuid(), row.getOutputs().get().get(1).getDatasetRow().getUuid())); @@ -746,10 +746,10 @@ public void testGetDatasetDataDoesNotReturnDeletedDataset() { .extracting(ds -> ds.getName().getValue()) .anyMatch(str -> str.contains(deleteName)); - datasetDao.softDelete(NAMESPACE, deleteName); + datasetDao.delete(NAMESPACE, deleteName); datasetData = - lineageDao.getNonDeletedDatasetData( + lineageDao.getDatasetData( Set.of( row.getOutputs().get().get(0).getDatasetRow().getUuid(), row.getOutputs().get().get(1).getDatasetRow().getUuid())); diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 9b23dff6ef..86c6a4252c 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -18,6 +18,7 @@ import marquez.common.models.DatasetVersionId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; +import marquez.db.DatasetDao; import marquez.db.JobDao; import marquez.db.LineageDao; import marquez.db.LineageTestUtils; @@ -52,6 +53,10 @@ public class LineageServiceTest { private static LineageDao lineageDao; private static LineageService lineageService; private static OpenLineageDao openLineageDao; + + private static DatasetDao datasetDao; + private static JobDao jobDao; + private final Dataset dataset = new Dataset( NAMESPACE, @@ -70,6 +75,8 @@ public static void setUpOnce(Jdbi jdbi) { lineageDao = jdbi.onDemand(LineageDao.class); lineageService = new LineageService(lineageDao, jdbi.onDemand(JobDao.class)); openLineageDao = jdbi.onDemand(OpenLineageDao.class); + datasetDao = jdbi.onDemand(DatasetDao.class); + jobDao = jdbi.onDemand(JobDao.class); } @AfterEach @@ -203,6 +210,119 @@ && jobNameEquals(n, "downstreamJob0<-outputData<-readJob0<-commonDataset")) new DatasetName("outputData<-readJob0<-commonDataset"))); } + @Test + public void testLineageWithDeletedDataset() { + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + List jobRows = + writeDownstreamLineage( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 20, Optional.of("outputData")), + new DatasetConsumerJob("downstreamJob", 1, Optional.of("outputData2")), + new DatasetConsumerJob("finalConsumer", 1, Optional.empty()))), + jobFacet, + dataset); + UpdateLineageRow secondRun = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + writeDownstreamLineage( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("newReadJob", 5, Optional.of("outputData3")), + new DatasetConsumerJob("newDownstreamJob", 1, Optional.empty()))), + jobFacet, + dataset); + + datasetDao.delete(NAMESPACE, "commonDataset"); + + String jobName = writeJob.getJob().getName(); + Lineage lineage = + lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2); + + // 1 writeJob + 0 commonDataset is hidden + // 20 readJob + 20 outputData + // 20 downstreamJob + 20 outputData2 + // 5 newReadJob + 5 outputData3 + // 5 newDownstreamJob + 0 + assertThat(lineage.getGraph()) + .hasSize(96) // 51 jobs + 45 datasets - one is hidden + .areExactly(51, new Condition<>(n -> n.getType().equals(NodeType.JOB), "job")) + .areExactly(45, new Condition<>(n -> n.getType().equals(NodeType.DATASET), "dataset")) + // finalConsumer job is out of the depth range + .filteredOn( + node -> + node.getType().equals(NodeType.JOB) + && node.getId().asJobId().getName().getValue().contains("finalConsumer")) + .isEmpty(); + + // assert the second run of writeJob is returned + AbstractObjectAssert runAssert = + assertThat(lineage.getGraph()) + .filteredOn( + node -> node.getType().equals(NodeType.JOB) && jobNameEquals(node, "writeJob")) + .hasSize(1) + .first() + .extracting( + n -> ((JobData) n.getData()).getLatestRun(), + InstanceOfAssertFactories.optional(Run.class)) + .isPresent() + .get(); + runAssert.extracting(r -> r.getId().getValue()).isEqualTo(secondRun.getRun().getUuid()); + runAssert + .extracting(Run::getInputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .hasSize(0); + runAssert + .extracting(Run::getOutputVersions, InstanceOfAssertFactories.list(DatasetVersionId.class)) + .hasSize(1); + + // check the output edges for the commonDataset node + assertThat(lineage.getGraph()) + .filteredOn( + node -> + node.getType().equals(NodeType.DATASET) + && node.getId().asDatasetId().getName().getValue().equals("commonDataset")) + .isEmpty(); + + jobDao.delete(NAMESPACE, "downstreamJob0<-outputData<-readJob0<-commonDataset"); + + lineage = + lineageService.lineage(NodeId.of(new NamespaceName(NAMESPACE), new JobName(jobName)), 2); + + // 1 writeJob + 0 commonDataset is hidden + // 20 readJob + 20 outputData + // 20 downstreamJob + 20 outputData2 + // 5 newReadJob + 5 outputData3 + // 5 newDownstreamJob + 0 + assertThat(lineage.getGraph()) + .hasSize( + 94) // 51 jobs + 45 datasets - one dataset is hidden + one job that produces dataset is + // hidden + .areExactly(50, new Condition<>(n -> n.getType().equals(NodeType.JOB), "job")) + .areExactly(44, new Condition<>(n -> n.getType().equals(NodeType.DATASET), "dataset")); + + // assert that readJob is hidden + assertThat(lineage.getGraph()) + .filteredOn( + n -> + n.getType().equals(NodeType.JOB) + && jobNameEquals(n, "downstreamJob0<-outputData<-readJob0<-commonDataset")) + .isEmpty(); + } + @Test public void testLineageWithNoDatasets() { UpdateLineageRow writeJob = diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 3acd192408..9059c56963 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -215,6 +215,11 @@ public Job getJob(@NonNull String namespaceName, @NonNull String jobName) { return Job.fromJson(bodyAsJson); } + public Job deleteJob(@NonNull String namespaceName, @NonNull String jobName) { + final String bodyAsJson = http.delete(url.toJobUrl(namespaceName, jobName)); + return Job.fromJson(bodyAsJson); + } + public JobVersion getJobVersion( @NonNull String namespaceName, @NonNull String jobName, String version) { final String bodyAsJson = http.get(url.toJobVersionUrl(namespaceName, jobName, version)); diff --git a/docker-compose.yml b/docker-compose.yml index 307e6bc057..b21923a5b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,7 +34,7 @@ services: image: postgres:12.1 container_name: marquez-db ports: - - "5432" + - "5432:5432" environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=password diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 5320c72cc9..3e279e97cf 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -12,8 +12,14 @@ if [[ -z "${MARQUEZ_CONFIG}" ]]; then echo "WARNING 'MARQUEZ_CONFIG' not set, using development configuration." fi +if [[ -z "${MARQUEZ_VERSION}" ]]; then + MARQUEZ_VERSION='*' + echo "WARNING 'MARQUEZ_VERSION' not set. Running could fail if directory contains multiple jar versions." +fi + + # Adjust java options for the http server JAVA_OPTS="${JAVA_OPTS} -Duser.timezone=UTC -Dlog4j2.formatMsgNoLookups=true" # Start http server with java options and configuration -java ${JAVA_OPTS} -jar marquez-*.jar server ${MARQUEZ_CONFIG} +java ${JAVA_OPTS} -jar marquez-${MARQUEZ_VERSION}.jar server ${MARQUEZ_CONFIG}