From 4d0cdbd3bcfd7cb82a61bce094b27c6e4cd7e160 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Fri, 6 May 2022 16:49:56 -0700 Subject: [PATCH 1/5] Add migrations to support job parent relationship storage Signed-off-by: Michael Collado --- .../V43__alter_jobs_add_job_parent_id.sql | 44 ++++++++++++++++++ .../V44__runs_job_versions_add_job_uuid.sql | 46 +++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql create mode 100644 api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql diff --git a/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql b/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql new file mode 100644 index 0000000000..8450f72922 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql @@ -0,0 +1,44 @@ +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS parent_job_uuid uuid CONSTRAINT jobs_parent_fk_jobs REFERENCES jobs (uuid); +ALTER TABLE runs ADD COLUMN IF NOT EXISTS parent_run_uuid uuid CONSTRAINT runs_parent_fk_runs REFERENCES runs (uuid); + +DROP INDEX IF EXISTS jobs_name_index; +ALTER TABLE jobs DROP CONSTRAINT jobs_namespace_uuid_name_key; +DROP INDEX IF EXISTS jobs_namespace_uuid_name_key; + +CREATE UNIQUE INDEX IF NOT EXISTS jobs_name_parent ON jobs (name, namespace_name, parent_job_uuid); +CREATE UNIQUE INDEX IF NOT EXISTS jobs_namespace_uuid_name_parent ON jobs (name, namespace_uuid, parent_job_uuid); +CREATE UNIQUE INDEX IF NOT EXISTS jobs_namespace_uuid_name_null_parent ON jobs (name, namespace_uuid) WHERE parent_job_uuid IS NULL; +ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE USING INDEX jobs_namespace_uuid_name_parent; + +CREATE OR REPLACE VIEW jobs_view +AS + with recursive + job_fqn AS ( + SELECT uuid, name, namespace_name, parent_job_uuid + FROM jobs + UNION ALL + SELECT j1.uuid, + CASE WHEN j2.name IS NOT NULL THEN j2.name || '.' || j1.name ELSE j1.name END AS name, + CASE WHEN j2.namespace_name IS NOT NULL THEN j2.namespace_name ELSE j1.namespace_name END AS namespace_name, + j2.parent_job_uuid + FROM jobs j1 + INNER JOIN job_fqn j2 ON j2.uuid=j1.parent_job_uuid + ) + SELECT f.uuid, + f.name, + f.namespace_name, + j.name AS simple_name, + f.parent_job_uuid, + j.type, + j.created_at, + j.updated_at, + j.namespace_uuid, + j.description, + j.current_version_uuid, + j.current_job_context_uuid, + j.current_location, + j.current_inputs, + j.symlink_target_uuid + FROM job_fqn f, jobs j + WHERE j.uuid=f.uuid + AND f.parent_job_uuid IS NULL diff --git a/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql b/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql new file mode 100644 index 0000000000..389f943609 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql @@ -0,0 +1,46 @@ +ALTER TABLE runs ADD COLUMN IF NOT EXISTS job_uuid uuid; + +CREATE INDEX IF NOT EXISTS runs_job_uuid ON runs(job_uuid, transitioned_at DESC); + +-- Trigger updates for records being written by the still-running version of the application +CREATE OR REPLACE FUNCTION write_run_job_uuid() + RETURNS trigger + LANGUAGE plpgsql AS +$func$ +BEGIN + NEW.job_uuid := (SELECT uuid FROM jobs j WHERE j.name=OLD.job_name AND j.namespace_name=OLD.namespace_name); + RETURN NEW; +END +$func$; + +DROP TRIGGER IF EXISTS runs_insert_job_uuid ON runs; + +CREATE TRIGGER runs_insert_job_uuid + BEFORE INSERT ON runs + FOR EACH ROW + WHEN (NEW.job_uuid IS NULL AND NEW.job_name IS NOT NULL AND NEW.namespace_name IS NOT NULL) +EXECUTE PROCEDURE write_run_job_uuid(); + +CREATE OR REPLACE VIEW runs_view +AS +SELECT r.uuid, + r.created_at, + r.updated_at, + job_version_uuid, + run_args_uuid, + nominal_start_time, + nominal_end_time, + current_run_state, + start_run_state_uuid, + end_run_state_uuid, + external_id, + location, + transitioned_at, + started_at, + ended_at, + job_context_uuid, + job_uuid, + j.name AS job_name, + j.namespace_name +FROM runs r +INNER JOIN jobs_view j ON j.uuid = r.job_uuid; From fbda91b7d6b9fff369428855724820ed2ea39752 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Mon, 4 Apr 2022 14:23:46 -0700 Subject: [PATCH 2/5] Update all job and run queries to reference jobs_view and runs_view Signed-off-by: Michael Collado --- api/src/main/java/marquez/MarquezContext.java | 2 +- .../main/java/marquez/api/JobResource.java | 18 +- api/src/main/java/marquez/db/JobDao.java | 84 +++++++- .../main/java/marquez/db/JobVersionDao.java | 16 +- api/src/main/java/marquez/db/LineageDao.java | 7 +- .../main/java/marquez/db/OpenLineageDao.java | 2 + api/src/main/java/marquez/db/RunDao.java | 19 +- api/src/main/java/marquez/db/SearchDao.java | 2 +- .../java/marquez/graphql/GraphqlDaos.java | 204 ++++++++++-------- .../java/marquez/service/LineageService.java | 11 +- .../main/java/marquez/service/RunService.java | 7 +- api/src/test/java/marquez/db/DbTestUtils.java | 6 +- .../java/marquez/db/JobVersionDaoTest.java | 7 +- api/src/test/java/marquez/db/RunDaoTest.java | 8 +- .../marquez/service/LineageServiceTest.java | 3 +- 15 files changed, 254 insertions(+), 142 deletions(-) diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index a2709d7717..0af4d28c7b 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -127,7 +127,7 @@ private MarquezContext( this.tagService = new TagService(baseDao); this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); - this.lineageService = new LineageService(lineageDao); + this.lineageService = new LineageService(lineageDao, jobDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() diff --git a/api/src/main/java/marquez/api/JobResource.java b/api/src/main/java/marquez/api/JobResource.java index 272b8ce544..28483a81b5 100644 --- a/api/src/main/java/marquez/api/JobResource.java +++ b/api/src/main/java/marquez/api/JobResource.java @@ -38,6 +38,7 @@ import marquez.common.models.RunId; import marquez.common.models.Version; import marquez.db.JobVersionDao; +import marquez.db.models.JobRow; import marquez.service.ServiceFactory; import marquez.service.models.Job; import marquez.service.models.JobMeta; @@ -171,8 +172,21 @@ public Response createRun( throwIfNotExists(namespaceName); throwIfNotExists(namespaceName, jobName); throwIfExists(namespaceName, jobName, runMeta.getId().orElse(null)); - - final Run run = runService.createRun(namespaceName, jobName, runMeta); + JobRow job = + jobService + .findJobByNameAsRow(namespaceName.getValue(), jobName.getValue()) + .or( + () -> + jobService.findJobBySimpleNameAsRow( + namespaceName.getValue(), jobName.getValue())) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + "No such job with namespace %s and name %s", + namespaceName.getValue(), jobName.getValue()))); + + final Run run = runService.createRun(namespaceName, job, runMeta); final URI runLocation = locationFor(uriInfo, run); return Response.created(runLocation).entity(run).build(); } diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 21a3272377..d501ce2d93 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -35,7 +35,7 @@ @RegisterRowMapper(JobMapper.class) public interface JobDao extends BaseDao { @SqlQuery( - "SELECT EXISTS (SELECT 1 FROM jobs AS j " + "SELECT EXISTS (SELECT 1 FROM jobs_view AS j " + "WHERE j.namespace_name= :namespaceName AND " + " j.name = :jobName)") boolean exists(String namespaceName, String jobName); @@ -50,17 +50,17 @@ public interface JobDao extends BaseDao { @SqlQuery( """ WITH RECURSIVE job_ids AS ( - SELECT uuid, symlink_target_uuid - FROM jobs j + SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid + FROM jobs_view j WHERE j.namespace_name=:namespaceName AND j.name=:jobName UNION - SELECT j.uuid, j.symlink_target_uuid - FROM jobs j + SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid + FROM jobs_view j INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid ) SELECT j.*, jc.context, f.facets - FROM jobs j - INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL + FROM jobs_view j + INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( @@ -106,9 +106,17 @@ WITH RECURSIVE job_ids AS ( """) Optional findJobByNameAsRow(String namespaceName, String jobName); + @SqlQuery( + """ + SELECT j.* FROM jobs AS j + WHERE j.namespace_name = :namespaceName AND + j.name = :jobName + """) + Optional findJobBySimpleNameAsRow(String namespaceName, String jobName); + @SqlQuery( "SELECT j.*, jc.context, f.facets\n" - + " FROM jobs AS j\n" + + " FROM jobs_view AS j\n" + " LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid\n" + " LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid\n" + "LEFT OUTER JOIN (\n" @@ -117,7 +125,7 @@ WITH RECURSIVE job_ids AS ( + " SELECT run_uuid, event->'job'->'facets' AS facets\n" + " FROM lineage_events AS le\n" + " INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid\n" - + " INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid\n" + + " INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid\n" + " WHERE j2.namespace_name=:namespaceName\n" + " ORDER BY event_time ASC\n" + " ) e\n" @@ -253,7 +261,62 @@ INSERT INTO jobs AS j ( :location, :inputs, :symlinkTargetId - ) ON CONFLICT (name, namespace_uuid) DO + ) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO + UPDATE SET + updated_at = EXCLUDED.updated_at, + type = EXCLUDED.type, + description = EXCLUDED.description, + current_job_context_uuid = EXCLUDED.current_job_context_uuid, + current_location = EXCLUDED.current_location, + current_inputs = EXCLUDED.current_inputs, + -- update the symlink target if not null. otherwise, keep the old value + symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid) + RETURNING * + """) + JobRow upsertJob( + UUID uuid, + JobType type, + Instant now, + UUID namespaceUuid, + String namespaceName, + String name, + String description, + UUID jobContextUuid, + String location, + UUID symlinkTargetId, + PGobject inputs); + + @SqlQuery( + """ + INSERT INTO jobs AS j ( + uuid, + parent_job_uuid, + type, + created_at, + updated_at, + namespace_uuid, + namespace_name, + name, + description, + current_job_context_uuid, + current_location, + current_inputs, + symlink_target_uuid + ) VALUES ( + :uuid, + :parentJobUuid, + :type, + :now, + :now, + :namespaceUuid, + :namespaceName, + :name, + :description, + :jobContextUuid, + :location, + :inputs, + :symlinkTargetId + ) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO UPDATE SET updated_at = EXCLUDED.updated_at, type = EXCLUDED.type, @@ -267,6 +330,7 @@ INSERT INTO jobs AS j ( """) JobRow upsertJob( UUID uuid, + UUID parentJobUuid, JobType type, Instant now, UUID namespaceUuid, diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index b58378dd56..022803859a 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -59,16 +59,20 @@ enum IoType { + " 'name', ds.name))\n" + " FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets\n" + " FROM job_versions_io_mapping io\n" - + " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n" - + " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n" - + " WHERE jv.namespace_name = :namespaceName\n" - + " AND jv.job_name = :jobName\n" + + " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n" + + " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n" + + " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n" + + " WHERE j.namespace_name = :namespaceName\n" + + " AND j.name = :jobName\n" + " GROUP BY io.job_version_uuid\n" + "), relevant_job_versions AS (\n" - + " SELECT jv.*, jc.context\n" + + " SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, \n" + + " jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, \n" + + " j.namespace_name, j.name AS job_name, jc.context\n" + " FROM job_versions jv\n" + " LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid\n" - + " WHERE job_name = :jobName AND namespace_name=:namespaceName\n" + + " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n" + + " WHERE j.name = :jobName AND j.namespace_name=:namespaceName\n" + " ORDER BY jv.created_at DESC\n" + ")\n" + "SELECT jv.*,\n" diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 0fd25c2643..62b082c8a4 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -59,13 +59,10 @@ public interface LineageDao { + " )\n" + "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n" + "FROM lineage l2\n" - + "INNER JOIN jobs j ON j.uuid=l2.job_uuid\n" + + "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n" + "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid") Set getLineage(@BindList Set jobIds, int depth); - @SqlQuery("SELECT uuid from jobs where name = :jobName and namespace_name = :namespace") - Optional getJobUuid(String jobName, String namespace); - @SqlQuery( "SELECT ds.*, dv.fields, dv.lifecycle_state\n" + "FROM datasets ds\n" @@ -86,7 +83,7 @@ public interface LineageDao { @SqlQuery( "WITH latest_runs AS (\n" + " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n" - + " FROM runs r\n" + + " FROM runs_view r\n" + " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n" + " WHERE jv.job_uuid in ()\n" + " ORDER BY r.job_name, r.namespace_name, created_at DESC\n" diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index e83d0b9ff1..11391056cc 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -176,6 +176,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper runUuid, event.getRun().getRunId(), now, + job.getUuid(), null, runArgs.getUuid(), nominalStartTime, @@ -192,6 +193,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper runUuid, event.getRun().getRunId(), now, + job.getUuid(), null, runArgs.getUuid(), nominalStartTime, diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 7cfb24f1e0..f03fbb8f36 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -15,7 +15,6 @@ import marquez.common.Utils; import marquez.common.models.DatasetId; import marquez.common.models.Field; -import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.common.models.RunId; import marquez.common.models.RunState; @@ -69,9 +68,9 @@ public interface RunDao extends BaseDao { String BASE_FIND_RUN_SQL = "SELECT r.*, ra.args, ctx.context, f.facets,\n" - + "jv.namespace_name, jv.job_name, jv.version AS job_version,\n" + + "jv.version AS job_version,\n" + "ri.input_versions, ro.output_versions\n" - + "FROM runs AS r\n" + + "FROM runs_view AS r\n" + "LEFT OUTER JOIN\n" + "(\n" + " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" @@ -156,6 +155,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + "external_id, " + "created_at, " + "updated_at, " + + "job_uuid, " + "job_version_uuid, " + "run_args_uuid, " + "nominal_start_time, " @@ -171,6 +171,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + ":externalId, " + ":now, " + ":now, " + + ":jobUuid," + ":jobVersionUuid, " + ":runArgsUuid, " + ":nominalStartTime, " @@ -194,6 +195,7 @@ ExtendedRunRow upsert( UUID runUuid, String externalId, Instant now, + UUID jobUuid, UUID jobVersionUuid, UUID runArgsUuid, Instant nominalStartTime, @@ -211,6 +213,7 @@ ExtendedRunRow upsert( + "external_id, " + "created_at, " + "updated_at, " + + "job_uuid, " + "job_version_uuid, " + "run_args_uuid, " + "nominal_start_time, " @@ -224,6 +227,7 @@ ExtendedRunRow upsert( + ":externalId, " + ":now, " + ":now, " + + ":jobUuid, " + ":jobVersionUuid, " + ":runArgsUuid, " + ":nominalStartTime, " @@ -243,6 +247,7 @@ ExtendedRunRow upsert( UUID runUuid, String externalId, Instant now, + UUID jobUuid, UUID jobVersionUuid, UUID runArgsUuid, Instant nominalStartTime, @@ -347,7 +352,7 @@ default void updateInputDatasetMapping(Set inputs, UUID runUuid) { /** Insert from run creates a run but does not associate any datasets. */ @Transaction default RunRow upsertRunMeta( - NamespaceName namespaceName, JobName jobName, RunMeta runMeta, RunState currentState) { + NamespaceName namespaceName, JobRow jobRow, RunMeta runMeta, RunState currentState) { Instant now = Instant.now(); NamespaceRow namespaceRow = @@ -363,9 +368,6 @@ default RunRow upsertRunMeta( Utils.toJson(runMeta.getArgs()), Utils.checksumFor(runMeta.getArgs())); - JobRow jobRow = - createJobDao().findJobByNameAsRow(namespaceName.getValue(), jobName.getValue()).get(); - UUID uuid = runMeta.getId().map(RunId::getValue).orElse(UUID.randomUUID()); RunRow runRow = @@ -373,6 +375,7 @@ default RunRow upsertRunMeta( uuid, null, now, + jobRow.getUuid(), null, runArgsRow.getUuid(), runMeta.getNominalStartTime().orElse(null), @@ -380,7 +383,7 @@ default RunRow upsertRunMeta( currentState, now, namespaceRow.getName(), - jobName.getValue(), + jobRow.getName(), jobRow.getLocation(), jobRow.getJobContextUuid().orElse(null)); diff --git a/api/src/main/java/marquez/db/SearchDao.java b/api/src/main/java/marquez/db/SearchDao.java index b089a07634..7557131928 100644 --- a/api/src/main/java/marquez/db/SearchDao.java +++ b/api/src/main/java/marquez/db/SearchDao.java @@ -31,7 +31,7 @@ public interface SearchDao { + " WHERE d.name ilike '%' || :query || '%'\n" + " UNION\n" + " SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n" - + " FROM jobs AS j\n" + + " FROM jobs_view AS j\n" + " WHERE j.name ilike '%' || :query || '%'\n" + ") AS results\n" + "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n" diff --git a/api/src/main/java/marquez/graphql/GraphqlDaos.java b/api/src/main/java/marquez/graphql/GraphqlDaos.java index a8c5b67287..a681c7ac7e 100644 --- a/api/src/main/java/marquez/graphql/GraphqlDaos.java +++ b/api/src/main/java/marquez/graphql/GraphqlDaos.java @@ -30,13 +30,13 @@ public interface GraphqlDaos extends SqlObject { "SELECT * FROM datasets where namespace_name = :namespaceName and datasets.name = :name") RowMap getDatasetByNamespaceAndName(String namespaceName, String name); - @SqlQuery("SELECT * FROM jobs where namespace_name = :namespaceName and name = :name") + @SqlQuery("SELECT * FROM jobs_view where namespace_name = :namespaceName and name = :name") RowMap getJobByNamespaceAndName(String namespaceName, String name); @SqlQuery("SELECT * FROM datasets where namespace_name = :namespaceName and name = :name") RowMap getDatasetsByNamespaceAndName(String namespaceName, String name); - @SqlQuery("SELECT * FROM jobs") + @SqlQuery("SELECT * FROM jobs_view") List> getJobs(); @SqlQuery("SELECT * FROM sources where uuid = :uuid") @@ -59,10 +59,10 @@ public interface GraphqlDaos extends SqlObject { @SqlQuery("SELECT d.* from datasets d where source_uuid = :sourceUuid") List> getDatasetsBySource(UUID sourceUuid); - @SqlQuery("SELECT * from runs where uuid = :uuid") + @SqlQuery("SELECT * from runs_view where uuid = :uuid") RowMap getRun(UUID uuid); - @SqlQuery("SELECT * from runs where run_args_uuid = :runArgsUuid") + @SqlQuery("SELECT * from runs_view where run_args_uuid = :runArgsUuid") List> getRunsByRunArgs(UUID runArgsUuid); @SqlQuery("SELECT * FROM dataset_versions where uuid = :uuid") @@ -73,11 +73,11 @@ public interface GraphqlDaos extends SqlObject { List> getDatasetVersionInputsByRun(UUID runUuid); @SqlQuery( - "SELECT r.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid where m.dataset_version_uuid = :datasetVersionUuid") + "SELECT r.* from runs_view r inner join runs_input_mapping m on m.run_uuid = r.uuid where m.dataset_version_uuid = :datasetVersionUuid") List> getRunsByDatasetVersion(UUID datasetVersionUuid); @SqlQuery( - "SELECT distinct jv.* from runs r inner join runs_input_mapping m on m.run_uuid = r.uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where m.dataset_version_uuid = :datasetVersionUuid") + "SELECT distinct jv.* from runs_view r inner join runs_input_mapping m on m.run_uuid = r.uuid inner join job_versions jv on jv.uuid = r.job_version_uuid where m.dataset_version_uuid = :datasetVersionUuid") List> getDistinctJobVersionsByDatasetVersion(UUID datasetVersionUuid); @SqlQuery( @@ -113,17 +113,32 @@ List> getDistinctJobVersionsByDatasetVersionOutput( List> getIOMappingByJobVersion(UUID jobVersionUuid, IoType ioType); @SqlQuery( - "SELECT jv.* " + "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " + + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " j.name AS job_name " + " FROM job_versions_io_mapping m " + " inner join job_versions jv " + " on m.dataset_uuid = jv.uuid" + + " inner join jobs_view j ON j.uuid=jv.job_uuid " + " where m.dataset_uuid = :datasetUuid AND m.io_type = :ioType") List> getJobVersionsByIoMapping(UUID datasetUuid, IoType ioType); - @SqlQuery("SELECT * from job_versions where job_uuid = :jobUuid") + @SqlQuery( + "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " + + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " j.name AS job_name " + + " from job_versions jv " + + " inner join jobs_view j ON j.uuid=jv.job_uuid " + + " where job_uuid = :jobUuid") List> getJobVersionByJob(UUID jobUuid); - @SqlQuery("SELECT * from job_versions where uuid = :uuid") + @SqlQuery( + "SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, " + + " jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, j.namespace_name, " + + " j.name AS job_name " + + " from job_versions jv " + + " inner join jobs_view j ON j.uuid=jv.job_uuid " + + " where jv.uuid = :uuid") RowMap getJobVersion(UUID uuid); @SqlQuery("SELECT * from dataset_fields where dataset_uuid = :datasetVersionUuid") @@ -139,10 +154,10 @@ List> getDistinctJobVersionsByDatasetVersionOutput( @SqlQuery("SELECT * FROM namespaces where name = :name") RowMap getNamespaceByName(String name); - @SqlQuery("SELECT * from jobs where namespace_uuid = :namespaceUuid") + @SqlQuery("SELECT * from jobs_view where namespace_uuid = :namespaceUuid") List> getJobsByNamespace(UUID namespaceUuid); - @SqlQuery("SELECT * from jobs where uuid = :uuid") + @SqlQuery("SELECT * from jobs_view where uuid = :uuid") RowMap getJob(UUID uuid); @SqlQuery("SELECT * from run_states where run_uuid = :runUuid order by transitioned_at desc") @@ -163,87 +178,90 @@ List> getDistinctJobVersionsByDatasetVersionOutput( List> getTagsByDatasetField(UUID datasetFieldUuid); @SqlQuery( - "select distinct on (lineage.job_name, " - + " lineage.namespace_name) " - + " lineage.job_name as name, " - + " lineage.namespace_name as namespace, " - + " d_in.agg as \"inEdges\", " - + " d_out.agg as \"outEdges\" " - + "from ( " - + "WITH RECURSIVE search_graph(job_name, namespace_name, depth, path, cycle) AS ( " - + "select j.name, j.namespace_name, 1, ARRAY[j.name], false " - + "from jobs j " - + "where name = :jobName and j.namespace_name = :namespaceName " - + "UNION ALL " - + "select l.job_name, l.namespace_name, depth+1, (path || l.job_name), l.job_name = ANY(path) " - + "from search_graph sg, " - + "( " - + "select jv.job_name, jv.namespace_name, j.name as jx " - + "from jobs j " - + "inner join job_versions_io_mapping io_in on io_in.job_version_uuid = j.current_version_uuid and io_in.io_type = 'INPUT' " - + "inner join job_versions_io_mapping io_out on io_out.dataset_uuid = io_in.dataset_uuid and io_out.io_type = 'OUTPUT' " - + "inner join job_versions jv on jv.uuid = io_out.job_version_uuid " - + "UNION ALL " - + "select jv.job_name, jv.namespace_name, j.name as jx " - + "from jobs j " - + "inner join job_versions_io_mapping io_out on io_out.job_version_uuid = j.current_version_uuid and io_out.io_type = 'OUTPUT' " - + "inner join job_versions_io_mapping io_in on io_in.dataset_uuid = io_out.dataset_uuid and io_in.io_type = 'INPUT' " - + "inner join job_versions jv on jv.uuid = io_in.job_version_uuid " - + ") l where l.jx = sg.job_name and NOT cycle) " - + "SELECT * FROM search_graph where NOT cycle and depth <= :depth) lineage " - // Construct the dataset edges: - + "inner join jobs j on lineage.job_name = j.name and lineage.namespace_name = j.namespace_name " - // input datasets - + "left outer join " - + " (select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as \"inEdges\", i.in_agg as \"outEdges\") AS x)) as agg " - + " from job_versions_io_mapping io_out " - + " inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid " - // output jobs for each input dataset - + " left outer join ( " - + " select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg " - + " from jobs j_of_in " - + " left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid " - + " and io_of_in.io_type = 'INPUT' " - + " group by io_of_in.dataset_uuid " - + " ) i on i.dataset_uuid = io_out.dataset_uuid " - // input jobs for each input dataset - + " left outer join ( " - + " select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg " - + " from jobs j_of_out " - + " left outer join job_versions_io_mapping io_of_out " - + " on io_of_out.job_version_uuid = j_of_out.current_version_uuid " - + " and io_of_out.io_type = 'OUTPUT' " - + " group by io_of_out.dataset_uuid " - + " ) o on o.dataset_uuid = io_out.dataset_uuid " - + " WHERE io_out.io_type = 'OUTPUT' " - + " group by io_out.job_version_uuid " - + " " - + " ) d_in on d_in.job_version_uuid = j.current_version_uuid " - // output datasets - + "left outer join " - + " (select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as \"inEdges\", i.in_agg as \"outEdges\") AS x)) as agg " - + " from job_versions_io_mapping io_out " - + " inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid " - // output jobs for each output dataset - + " left outer join ( " - + " select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg " - + " from jobs j_of_in " - + " left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid " - + " and io_of_in.io_type = 'INPUT' " - + " group by io_of_in.dataset_uuid " - + " ) i on i.dataset_uuid = io_out.dataset_uuid " - // input jobs for each output dataset - + " left outer join ( " - + " select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg " - + " from jobs j_of_out " - + " left outer join job_versions_io_mapping io_of_out " - + " on io_of_out.job_version_uuid = j_of_out.current_version_uuid " - + " and io_of_out.io_type = 'OUTPUT' " - + " group by io_of_out.dataset_uuid " - + " ) o on o.dataset_uuid = io_out.dataset_uuid " - + " WHERE io_out.io_type = 'INPUT' " - + " group by io_out.job_version_uuid " - + " " - + " ) d_out on d_out.job_version_uuid = j.current_version_uuid") + """ + select distinct on (lineage.job_name, + lineage.namespace_name) + lineage.job_name as name, + lineage.namespace_name as namespace, + d_in.agg as "inEdges", + d_out.agg as "outEdges" + from ( + WITH RECURSIVE search_graph(job_name, namespace_name, depth, path, cycle) AS ( + select j.name, j.namespace_name, 1, ARRAY[j.name], false + from jobs_view j + where name = :jobName and j.namespace_name = :namespaceName + UNION ALL + select l.job_name, l.namespace_name, depth+1, (path || l.job_name), l.job_name = ANY(path) + from search_graph sg, + ( + select j.name AS job_name, j.namespace_name, j.name as jx + from jobs_view j + inner join job_versions_io_mapping io_in on io_in.job_version_uuid = j.current_version_uuid and io_in.io_type = 'INPUT' + inner join job_versions_io_mapping io_out on io_out.dataset_uuid = io_in.dataset_uuid and io_out.io_type = 'OUTPUT' + inner join job_versions jv on jv.uuid = io_out.job_version_uuid + UNION ALL + select j.name AS job_name, jv.namespace_name, j.name as jx + from jobs_view j + inner join job_versions_io_mapping io_out on io_out.job_version_uuid = j.current_version_uuid and io_out.io_type = 'OUTPUT' + inner join job_versions_io_mapping io_in on io_in.dataset_uuid = io_out.dataset_uuid and io_in.io_type = 'INPUT' + inner join job_versions jv on jv.uuid = io_in.job_version_uuid + ) l where l.jx = sg.job_name and NOT cycle + ) + SELECT * FROM search_graph where NOT cycle and depth <= :depth + ) lineage + -- Construct the dataset edges: + inner join jobs_view j on lineage.job_name = j.name and lineage.namespace_name = j.namespace_name + -- input datasets + left outer join ( + select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg + from job_versions_io_mapping io_out + inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid + -- output jobs for each input dataset + left outer join ( + select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg + from jobs j_of_in + left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid + and io_of_in.io_type = 'INPUT' + group by io_of_in.dataset_uuid + ) i on i.dataset_uuid = io_out.dataset_uuid + -- input jobs for each input dataset + left outer join ( + select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg + from jobs_view j_of_out + left outer join job_versions_io_mapping io_of_out + on io_of_out.job_version_uuid = j_of_out.current_version_uuid + and io_of_out.io_type = 'OUTPUT' + group by io_of_out.dataset_uuid + ) o on o.dataset_uuid = io_out.dataset_uuid + WHERE io_out.io_type = 'OUTPUT' + group by io_out.job_version_uuid + + ) d_in on d_in.job_version_uuid = j.current_version_uuid + --output datasets + left outer join( + select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg + from job_versions_io_mapping io_out + inner join datasets ds_in on ds_in.uuid = io_out.dataset_uuid + -- output jobs for each output dataset + left outer join ( + select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg + from jobs_view j_of_in + left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid + and io_of_in.io_type = 'INPUT' + group by io_of_in.dataset_uuid + ) i on i.dataset_uuid = io_out.dataset_uuid + -- input jobs for each output dataset + left outer join ( + select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg + from jobs_view j_of_out + left outer join job_versions_io_mapping io_of_out + on io_of_out.job_version_uuid = j_of_out.current_version_uuid + and io_of_out.io_type = 'OUTPUT' + group by io_of_out.dataset_uuid + ) o on o.dataset_uuid = io_out.dataset_uuid + WHERE io_out.io_type = 'INPUT' + group by io_out.job_version_uuid + ) d_out on d_out.job_version_uuid = j.current_version_uuid + """) List getLineage(String jobName, String namespaceName, Integer depth); } diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index c2d5b35b6a..940fba7e17 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -20,9 +20,11 @@ import lombok.extern.slf4j.Slf4j; import marquez.common.models.DatasetId; import marquez.common.models.JobId; +import marquez.db.JobDao; import marquez.db.LineageDao; import marquez.db.models.DatasetData; import marquez.db.models.JobData; +import marquez.db.models.JobRow; import marquez.service.DelegatingDaos.DelegatingLineageDao; import marquez.service.models.Edge; import marquez.service.models.Graph; @@ -34,8 +36,11 @@ @Slf4j public class LineageService extends DelegatingLineageDao { - public LineageService(LineageDao delegate) { + private final JobDao jobDao; + + public LineageService(LineageDao delegate, JobDao jobDao) { super(delegate); + this.jobDao = jobDao; } public Lineage lineage(NodeId nodeId, int depth) { @@ -188,7 +193,9 @@ private NodeId buildEdge(JobData e) { public Optional getJobUuid(NodeId nodeId) { if (nodeId.isJobType()) { JobId jobId = nodeId.asJobId(); - return getJobUuid(jobId.getName().getValue(), jobId.getNamespace().getValue()); + return jobDao + .findJobByNameAsRow(jobId.getNamespace().getValue(), jobId.getName().getValue()) + .map(JobRow::getUuid); } else if (nodeId.isDatasetType()) { DatasetId datasetId = nodeId.asDatasetId(); return getJobFromInputOrOutput( diff --git a/api/src/main/java/marquez/service/RunService.java b/api/src/main/java/marquez/service/RunService.java index 8811e01907..26515053e8 100644 --- a/api/src/main/java/marquez/service/RunService.java +++ b/api/src/main/java/marquez/service/RunService.java @@ -29,6 +29,7 @@ import marquez.db.RunStateDao; import marquez.db.models.ExtendedDatasetVersionRow; import marquez.db.models.ExtendedRunRow; +import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; import marquez.db.models.RunRow; import marquez.service.RunTransitionListener.JobInputUpdate; @@ -59,9 +60,9 @@ public RunService( * removed in release {@code 0.25.0}. */ public Run createRun( - @NonNull NamespaceName namespaceName, @NonNull JobName jobName, @NonNull RunMeta runMeta) { - log.info("Creating run for job '{}'...", jobName.getValue()); - RunRow runRow = upsertRunMeta(namespaceName, jobName, runMeta, NEW); + @NonNull NamespaceName namespaceName, @NonNull JobRow job, @NonNull RunMeta runMeta) { + log.info("Creating run for job '{}'...", job.getName()); + RunRow runRow = upsertRunMeta(namespaceName, job, runMeta, NEW); notify(new RunTransition(RunId.of(runRow.getUuid()), null, NEW)); return findRunByUuid(runRow.getUuid()).get(); diff --git a/api/src/test/java/marquez/db/DbTestUtils.java b/api/src/test/java/marquez/db/DbTestUtils.java index 8f8bf152e3..f628e6c1a4 100644 --- a/api/src/test/java/marquez/db/DbTestUtils.java +++ b/api/src/test/java/marquez/db/DbTestUtils.java @@ -245,16 +245,17 @@ static RunArgsRow newRunArgs(final Jdbi jdbi) { } /** Adds a new {@link RunRow} object to the {@code runs} table. */ - static RunRow newRun(final Jdbi jdbi, final String namespaceName, final String jobName) { + static RunRow newRun(final Jdbi jdbi, JobRow jobRow) { final RunDao runDao = jdbi.onDemand(RunDao.class); final RunMeta runMeta = newRunMeta(); return runDao.upsertRunMeta( - NamespaceName.of(namespaceName), JobName.of(jobName), runMeta, RunState.NEW); + NamespaceName.of(jobRow.getNamespaceName()), jobRow, runMeta, RunState.NEW); } /** Adds a new {@link RunRow} object to the {@code runs} table. */ static ExtendedRunRow newRun( final Jdbi jdbi, + final UUID jobUuid, final UUID jobVersionUuid, final UUID runArgsUuid, final UUID namespaceUuid, @@ -267,6 +268,7 @@ static ExtendedRunRow newRun( newRowUuid(), newExternalId(), newTimestamp(), + jobUuid, jobVersionUuid, runArgsUuid, newTimestamp(), diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index a1809f4863..bd227ef7ed 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -128,6 +128,7 @@ public void testUpdateLatestRunFor() { final ExtendedRunRow runRow = DbTestUtils.newRun( jdbiForTesting, + jobVersionRow.getJobUuid(), jobVersionRow.getUuid(), runArgsRow.getUuid(), namespaceRow.getUuid(), @@ -206,8 +207,7 @@ public void testGetJobVersions() { DbTestUtils.newJobWith( jdbiForTesting, namespaceRow.getName(), newJobName().getValue(), jobMeta); - final RunRow runRow = - DbTestUtils.newRun(jdbiForTesting, jobRow.getNamespaceName(), jobRow.getName()); + final RunRow runRow = DbTestUtils.newRun(jdbiForTesting, jobRow); final Run runCompleted = DbTestUtils.transitionRunWithOutputs( jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); @@ -247,8 +247,7 @@ public void testUpsertJobVersionOnRunTransition() { jdbiForTesting, namespaceRow.getName(), newJobName().getValue(), jobMeta); // (2) Add a new run; the input dataset versions will also be associated with the run. - final RunRow runRow = - DbTestUtils.newRun(jdbiForTesting, jobRow.getNamespaceName(), jobRow.getName()); + final RunRow runRow = DbTestUtils.newRun(jdbiForTesting, jobRow); // Ensure the input dataset versions have been associated with the run. final List inputDatasetVersions = diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index e8e05fe63e..aa5f49842c 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -89,7 +89,7 @@ public void getRun() { final JobRow jobRow = newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); - final RunRow runRow = DbTestUtils.newRun(jdbi, jobRow.getNamespaceName(), jobRow.getName()); + final RunRow runRow = DbTestUtils.newRun(jdbi, jobRow); DbTestUtils.transitionRunWithOutputs( jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); @@ -203,8 +203,7 @@ private Stream createRunsForJob( return IntStream.range(0, count) .mapToObj( i -> { - final RunRow runRow = - DbTestUtils.newRun(jdbi, jobRow.getNamespaceName(), jobRow.getName()); + final RunRow runRow = DbTestUtils.newRun(jdbi, jobRow); DbTestUtils.transitionRunWithOutputs( jdbi, runRow.getUuid(), RunState.COMPLETED, outputs); @@ -226,13 +225,14 @@ public void updateRowWithNullNominalTimeDoesNotUpdateNominalTime() { final JobRow jobRow = newJobWith(jdbi, namespaceRow.getName(), newJobName().getValue(), jobMeta); - RunRow row = DbTestUtils.newRun(jdbi, namespaceRow.getName(), jobRow.getName()); + RunRow row = DbTestUtils.newRun(jdbi, jobRow); RunRow updatedRow = runDao.upsert( row.getUuid(), row.getUuid().toString(), row.getUpdatedAt(), + jobRow.getUuid(), null, row.getRunArgsUuid(), null, diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 898b8398d7..2da8d703c5 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -15,6 +15,7 @@ import marquez.common.models.DatasetVersionId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; +import marquez.db.JobDao; import marquez.db.LineageDao; import marquez.db.LineageTestUtils; import marquez.db.LineageTestUtils.DatasetConsumerJob; @@ -64,7 +65,7 @@ public class LineageServiceTest { public static void setUpOnce(Jdbi jdbi) { LineageServiceTest.jdbi = jdbi; lineageDao = jdbi.onDemand(LineageDao.class); - lineageService = new LineageService(lineageDao); + lineageService = new LineageService(lineageDao, jdbi.onDemand(JobDao.class)); openLineageDao = jdbi.onDemand(OpenLineageDao.class); } From a45885bbd588b5727d9e52539d833ebc30cd1784 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Fri, 6 May 2022 16:06:19 -0700 Subject: [PATCH 3/5] Remove references to simple_name as job redirects handle redirecting simple name to fqn added unit test to verify Signed-off-by: Michael Collado --- .../main/java/marquez/api/JobResource.java | 4 - api/src/main/java/marquez/db/JobDao.java | 18 ++-- .../marquez/MarquezAppIntegrationTest.java | 87 +++++++++++++++++++ 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/api/src/main/java/marquez/api/JobResource.java b/api/src/main/java/marquez/api/JobResource.java index 28483a81b5..4bd9faddce 100644 --- a/api/src/main/java/marquez/api/JobResource.java +++ b/api/src/main/java/marquez/api/JobResource.java @@ -175,10 +175,6 @@ public Response createRun( JobRow job = jobService .findJobByNameAsRow(namespaceName.getValue(), jobName.getValue()) - .or( - () -> - jobService.findJobBySimpleNameAsRow( - namespaceName.getValue(), jobName.getValue())) .orElseThrow( () -> new IllegalArgumentException( diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index d501ce2d93..5c96b702a5 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -91,29 +91,21 @@ default Optional findWithRun(String namespaceName, String jobName) { @SqlQuery( """ WITH RECURSIVE job_ids AS ( - SELECT uuid, symlink_target_uuid - FROM jobs j + SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid + FROM jobs_view j WHERE j.namespace_name=:namespaceName AND j.name=:jobName UNION - SELECT j.uuid, j.symlink_target_uuid - FROM jobs j + SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid + FROM jobs_view j INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid ) SELECT j.*, n.name AS namespace_name FROM jobs AS j - INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL + INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid """) Optional findJobByNameAsRow(String namespaceName, String jobName); - @SqlQuery( - """ - SELECT j.* FROM jobs AS j - WHERE j.namespace_name = :namespaceName AND - j.name = :jobName - """) - Optional findJobBySimpleNameAsRow(String namespaceName, String jobName); - @SqlQuery( "SELECT j.*, jc.context, f.facets\n" + " FROM jobs_view AS j\n" diff --git a/api/src/test/java/marquez/MarquezAppIntegrationTest.java b/api/src/test/java/marquez/MarquezAppIntegrationTest.java index 9679aa96bb..8df08de162 100644 --- a/api/src/test/java/marquez/MarquezAppIntegrationTest.java +++ b/api/src/test/java/marquez/MarquezAppIntegrationTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.UUID; import marquez.client.models.Dataset; import marquez.client.models.DatasetId; import marquez.client.models.DbTable; @@ -700,4 +701,90 @@ public void testApp_getJob() throws SQLException { .hasFieldOrPropertyWithValue("namespace", NAMESPACE_NAME) .hasFieldOrPropertyWithValue("name", targetJobName); } + + @Test + public void testApp_getJobWithFQNFromParent() throws SQLException { + Jdbi jdbi = + Jdbi.create(POSTGRES.getJdbcUrl(), POSTGRES.getUsername(), POSTGRES.getPassword()) + .installPlugin(new SqlObjectPlugin()) + .installPlugin(new PostgresPlugin()); + createNamespace(NAMESPACE_NAME); + + // Create job + String jobName = newJobName().getValue(); + final JobMeta jobMeta = + JobMeta.builder() + .type(JOB_TYPE) + .inputs(ImmutableSet.of()) + .outputs(ImmutableSet.of()) + .location(JOB_LOCATION) + .context(JOB_CONTEXT) + .description(JOB_DESCRIPTION) + .build(); + final Job originalJob = client.createJob(NAMESPACE_NAME, jobName, jobMeta); + + String parentJobName = newJobName().getValue(); + final JobMeta parentJobMeta = + JobMeta.builder() + .type(JOB_TYPE) + .inputs(ImmutableSet.of()) + .outputs(ImmutableSet.of()) + .location(JOB_LOCATION) + .context(JOB_CONTEXT) + .description(JOB_DESCRIPTION) + .build(); + final Job parentJob = client.createJob(NAMESPACE_NAME, parentJobName, parentJobMeta); + + JobDao jobDao = jdbi.onDemand(JobDao.class); + NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class); + Optional namespaceRow = namespaceDao.findNamespaceByName(NAMESPACE_NAME); + if (namespaceRow.isEmpty()) { + throw new AssertionError("Couldn't find expected namespace row"); + } + Optional originalJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, jobName); + if (originalJobRow.isEmpty()) { + throw new AssertionError("Couldn't find job row we just inserted"); + } + Optional parentJobRow = jobDao.findJobByNameAsRow(NAMESPACE_NAME, parentJobName); + if (parentJobRow.isEmpty()) { + throw new AssertionError("Couldn't find parent job we just inserted"); + } + PGobject inputs = new PGobject(); + inputs.setType("json"); + inputs.setValue("[]"); + JobRow jobRow = originalJobRow.get(); + JobRow targetJobRow = + jobDao.upsertJob( + UUID.randomUUID(), + parentJobRow.get().getUuid(), + JobType.valueOf(jobRow.getType()), + Instant.now(), + namespaceRow.get().getUuid(), + namespaceRow.get().getName(), + jobRow.getName(), + jobRow.getDescription().orElse(null), + jobRow.getJobContextUuid().orElse(null), + jobRow.getLocation(), + null, + inputs); + // symlink the old job to point to the new one that has a parent uuid + jobDao.upsertJob( + jobRow.getUuid(), + JobType.valueOf(JOB_TYPE.name()), + Instant.now(), + namespaceRow.get().getUuid(), + NAMESPACE_NAME, + jobName, + JOB_DESCRIPTION, + jobRow.getJobContextUuid().orElse(null), + JOB_LOCATION.toString(), + targetJobRow.getUuid(), + inputs); + + Job job = client.getJob(NAMESPACE_NAME, jobName); + assertThat(job) + .isNotNull() + .hasFieldOrPropertyWithValue("namespace", NAMESPACE_NAME) + .hasFieldOrPropertyWithValue("name", parentJobName + "." + jobName); + } } From 1a68e560fedc62c60bf863787afc2775d78cd3d7 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Tue, 10 May 2022 10:41:36 -0700 Subject: [PATCH 4/5] Fix runs migration script Signed-off-by: Michael Collado --- .../marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql b/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql index 389f943609..8f7cc7edc3 100644 --- a/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql +++ b/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql @@ -26,6 +26,7 @@ AS SELECT r.uuid, r.created_at, r.updated_at, + r.parent_run_uuid, job_version_uuid, run_args_uuid, nominal_start_time, From 31e628eac64d4f53161359ab0dc2a34b303fb8a8 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Fri, 20 May 2022 14:28:27 -0700 Subject: [PATCH 5/5] Rename sql migration script Signed-off-by: Michael Collado --- ..._job_parent_id.sql => V43__alter_jobs_add_job_parent_uuid.sql} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename api/src/main/resources/marquez/db/migration/{V43__alter_jobs_add_job_parent_id.sql => V43__alter_jobs_add_job_parent_uuid.sql} (100%) diff --git a/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql b/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_uuid.sql similarity index 100% rename from api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql rename to api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_uuid.sql