Skip to content

Commit

Permalink
Job parent hierarchy (#1935)
Browse files Browse the repository at this point in the history
* Add migrations to support job parent relationship storage

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update all job and run queries to reference jobs_view and runs_view

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Remove references to simple_name as job redirects handle redirecting simple name to fqn
added unit test to verify

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Fix runs migration script

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Rename sql migration script

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored May 20, 2022
1 parent ac7a455 commit 0021a2b
Show file tree
Hide file tree
Showing 18 changed files with 425 additions and 147 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private MarquezContext(
this.tagService = new TagService(baseDao);
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao);
this.lineageService = new LineageService(lineageDao, jobDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand Down
14 changes: 12 additions & 2 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import marquez.common.models.RunId;
import marquez.common.models.Version;
import marquez.db.JobVersionDao;
import marquez.db.models.JobRow;
import marquez.service.ServiceFactory;
import marquez.service.models.Job;
import marquez.service.models.JobMeta;
Expand Down Expand Up @@ -171,8 +172,17 @@ public Response createRun(
throwIfNotExists(namespaceName);
throwIfNotExists(namespaceName, jobName);
throwIfExists(namespaceName, jobName, runMeta.getId().orElse(null));

final Run run = runService.createRun(namespaceName, jobName, runMeta);
JobRow job =
jobService
.findJobByNameAsRow(namespaceName.getValue(), jobName.getValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"No such job with namespace %s and name %s",
namespaceName.getValue(), jobName.getValue())));

final Run run = runService.createRun(namespaceName, job, runMeta);
final URI runLocation = locationFor(uriInfo, run);
return Response.created(runLocation).entity(run).build();
}
Expand Down
86 changes: 71 additions & 15 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@RegisterRowMapper(JobMapper.class)
public interface JobDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS (SELECT 1 FROM jobs AS j "
"SELECT EXISTS (SELECT 1 FROM jobs_view AS j "
+ "WHERE j.namespace_name= :namespaceName AND "
+ " j.name = :jobName)")
boolean exists(String namespaceName, String jobName);
Expand All @@ -50,17 +50,17 @@ public interface JobDao extends BaseDao {
@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
FROM jobs_view j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
Expand Down Expand Up @@ -91,24 +91,24 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs AS j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

@SqlQuery(
"SELECT j.*, jc.context, f.facets\n"
+ " FROM jobs AS j\n"
+ " FROM jobs_view AS j\n"
+ " LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid\n"
+ " LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid\n"
+ "LEFT OUTER JOIN (\n"
Expand All @@ -117,7 +117,7 @@ WITH RECURSIVE job_ids AS (
+ " SELECT run_uuid, event->'job'->'facets' AS facets\n"
+ " FROM lineage_events AS le\n"
+ " INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid\n"
+ " INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid\n"
+ " INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid\n"
+ " WHERE j2.namespace_name=:namespaceName\n"
+ " ORDER BY event_time ASC\n"
+ " ) e\n"
Expand Down Expand Up @@ -253,7 +253,7 @@ INSERT INTO jobs AS j (
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) DO
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
Expand All @@ -277,4 +277,60 @@ JobRow upsertJob(
String location,
UUID symlinkTargetId,
PGobject inputs);

@SqlQuery(
"""
INSERT INTO jobs AS j (
uuid,
parent_job_uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
name,
description,
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
) VALUES (
:uuid,
:parentJobUuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:name,
:description,
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
""")
JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
}
16 changes: 10 additions & 6 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,20 @@ enum IoType {
+ " 'name', ds.name))\n"
+ " FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets\n"
+ " FROM job_versions_io_mapping io\n"
+ " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n"
+ " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n"
+ " WHERE jv.namespace_name = :namespaceName\n"
+ " AND jv.job_name = :jobName\n"
+ " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n"
+ " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.namespace_name = :namespaceName\n"
+ " AND j.name = :jobName\n"
+ " GROUP BY io.job_version_uuid\n"
+ "), relevant_job_versions AS (\n"
+ " SELECT jv.*, jc.context\n"
+ " SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, \n"
+ " jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, \n"
+ " j.namespace_name, j.name AS job_name, jc.context\n"
+ " FROM job_versions jv\n"
+ " LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid\n"
+ " WHERE job_name = :jobName AND namespace_name=:namespaceName\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.name = :jobName AND j.namespace_name=:namespaceName\n"
+ " ORDER BY jv.created_at DESC\n"
+ ")\n"
+ "SELECT jv.*,\n"
Expand Down
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ public interface LineageDao {
+ " )\n"
+ "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "FROM lineage l2\n"
+ "INNER JOIN jobs j ON j.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n"
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

@SqlQuery("SELECT uuid from jobs where name = :jobName and namespace_name = :namespace")
Optional<UUID> getJobUuid(String jobName, String namespace);

@SqlQuery(
"SELECT ds.*, dv.fields, dv.lifecycle_state\n"
+ "FROM datasets ds\n"
Expand All @@ -86,7 +83,7 @@ public interface LineageDao {
@SqlQuery(
"WITH latest_runs AS (\n"
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs r\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " WHERE jv.job_uuid in (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
Expand All @@ -192,6 +193,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
Expand Down
19 changes: 11 additions & 8 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.Field;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.RunState;
Expand Down Expand Up @@ -69,9 +68,9 @@ public interface RunDao extends BaseDao {

String BASE_FIND_RUN_SQL =
"SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ "jv.namespace_name, jv.job_name, jv.version AS job_version,\n"
+ "jv.version AS job_version,\n"
+ "ri.input_versions, ro.output_versions\n"
+ "FROM runs AS r\n"
+ "FROM runs_view AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
Expand Down Expand Up @@ -156,6 +155,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ "external_id, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "job_version_uuid, "
+ "run_args_uuid, "
+ "nominal_start_time, "
Expand All @@ -171,6 +171,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ ":externalId, "
+ ":now, "
+ ":now, "
+ ":jobUuid,"
+ ":jobVersionUuid, "
+ ":runArgsUuid, "
+ ":nominalStartTime, "
Expand All @@ -194,6 +195,7 @@ ExtendedRunRow upsert(
UUID runUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Expand All @@ -211,6 +213,7 @@ ExtendedRunRow upsert(
+ "external_id, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "job_version_uuid, "
+ "run_args_uuid, "
+ "nominal_start_time, "
Expand All @@ -224,6 +227,7 @@ ExtendedRunRow upsert(
+ ":externalId, "
+ ":now, "
+ ":now, "
+ ":jobUuid, "
+ ":jobVersionUuid, "
+ ":runArgsUuid, "
+ ":nominalStartTime, "
Expand All @@ -243,6 +247,7 @@ ExtendedRunRow upsert(
UUID runUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Expand Down Expand Up @@ -347,7 +352,7 @@ default void updateInputDatasetMapping(Set<DatasetId> inputs, UUID runUuid) {
/** Insert from run creates a run but does not associate any datasets. */
@Transaction
default RunRow upsertRunMeta(
NamespaceName namespaceName, JobName jobName, RunMeta runMeta, RunState currentState) {
NamespaceName namespaceName, JobRow jobRow, RunMeta runMeta, RunState currentState) {
Instant now = Instant.now();

NamespaceRow namespaceRow =
Expand All @@ -363,24 +368,22 @@ default RunRow upsertRunMeta(
Utils.toJson(runMeta.getArgs()),
Utils.checksumFor(runMeta.getArgs()));

JobRow jobRow =
createJobDao().findJobByNameAsRow(namespaceName.getValue(), jobName.getValue()).get();

UUID uuid = runMeta.getId().map(RunId::getValue).orElse(UUID.randomUUID());

RunRow runRow =
upsert(
uuid,
null,
now,
jobRow.getUuid(),
null,
runArgsRow.getUuid(),
runMeta.getNominalStartTime().orElse(null),
runMeta.getNominalEndTime().orElse(null),
currentState,
now,
namespaceRow.getName(),
jobName.getValue(),
jobRow.getName(),
jobRow.getLocation(),
jobRow.getJobContextUuid().orElse(null));

Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/SearchDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface SearchDao {
+ " WHERE d.name ilike '%' || :query || '%'\n"
+ " UNION\n"
+ " SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM jobs AS j\n"
+ " FROM jobs_view AS j\n"
+ " WHERE j.name ilike '%' || :query || '%'\n"
+ ") AS results\n"
+ "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n"
Expand Down
Loading

0 comments on commit 0021a2b

Please sign in to comment.