Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job parent hierarchy #1935

Merged
merged 6 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ private MarquezContext(
this.tagService = new TagService(baseDao);
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao);
this.lineageService = new LineageService(lineageDao, jobDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand Down
14 changes: 12 additions & 2 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import marquez.common.models.RunId;
import marquez.common.models.Version;
import marquez.db.JobVersionDao;
import marquez.db.models.JobRow;
import marquez.service.ServiceFactory;
import marquez.service.models.Job;
import marquez.service.models.JobMeta;
Expand Down Expand Up @@ -171,8 +172,17 @@ public Response createRun(
throwIfNotExists(namespaceName);
throwIfNotExists(namespaceName, jobName);
throwIfExists(namespaceName, jobName, runMeta.getId().orElse(null));

final Run run = runService.createRun(namespaceName, jobName, runMeta);
JobRow job =
jobService
.findJobByNameAsRow(namespaceName.getValue(), jobName.getValue())
.orElseThrow(
() ->
new IllegalArgumentException(
String.format(
"No such job with namespace %s and name %s",
namespaceName.getValue(), jobName.getValue())));

final Run run = runService.createRun(namespaceName, job, runMeta);
final URI runLocation = locationFor(uriInfo, run);
return Response.created(runLocation).entity(run).build();
}
Expand Down
86 changes: 71 additions & 15 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
@RegisterRowMapper(JobMapper.class)
public interface JobDao extends BaseDao {
@SqlQuery(
"SELECT EXISTS (SELECT 1 FROM jobs AS j "
"SELECT EXISTS (SELECT 1 FROM jobs_view AS j "
+ "WHERE j.namespace_name= :namespaceName AND "
+ " j.name = :jobName)")
boolean exists(String namespaceName, String jobName);
Expand All @@ -50,17 +50,17 @@ public interface JobDao extends BaseDao {
@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
FROM jobs_view j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
Expand Down Expand Up @@ -91,24 +91,24 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
@SqlQuery(
"""
collado-mike marked this conversation as resolved.
Show resolved Hide resolved
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs AS j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);
collado-mike marked this conversation as resolved.
Show resolved Hide resolved

@SqlQuery(
"SELECT j.*, jc.context, f.facets\n"
+ " FROM jobs AS j\n"
+ " FROM jobs_view AS j\n"
+ " LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid\n"
+ " LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid\n"
+ "LEFT OUTER JOIN (\n"
Expand All @@ -117,7 +117,7 @@ WITH RECURSIVE job_ids AS (
+ " SELECT run_uuid, event->'job'->'facets' AS facets\n"
+ " FROM lineage_events AS le\n"
+ " INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid\n"
+ " INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid\n"
+ " INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid\n"
+ " WHERE j2.namespace_name=:namespaceName\n"
+ " ORDER BY event_time ASC\n"
+ " ) e\n"
Expand Down Expand Up @@ -253,7 +253,7 @@ INSERT INTO jobs AS j (
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) DO
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
Expand All @@ -277,4 +277,60 @@ JobRow upsertJob(
String location,
UUID symlinkTargetId,
PGobject inputs);

@SqlQuery(
"""
INSERT INTO jobs AS j (
uuid,
parent_job_uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
name,
description,
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
) VALUES (
:uuid,
:parentJobUuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:name,
:description,
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
""")
JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
}
16 changes: 10 additions & 6 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,20 @@ enum IoType {
+ " 'name', ds.name))\n"
+ " FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets\n"
+ " FROM job_versions_io_mapping io\n"
+ " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n"
+ " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n"
+ " WHERE jv.namespace_name = :namespaceName\n"
+ " AND jv.job_name = :jobName\n"
+ " INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid\n"
+ " INNER JOIN datasets ds ON ds.uuid = io.dataset_uuid\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.namespace_name = :namespaceName\n"
+ " AND j.name = :jobName\n"
+ " GROUP BY io.job_version_uuid\n"
+ "), relevant_job_versions AS (\n"
+ " SELECT jv.*, jc.context\n"
+ " SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, \n"
+ " jv.location, jv.latest_run_uuid, jv.job_context_uuid, j.namespace_uuid, \n"
+ " j.namespace_name, j.name AS job_name, jc.context\n"
+ " FROM job_versions jv\n"
+ " LEFT OUTER JOIN job_contexts AS jc ON jc.uuid = jv.job_context_uuid\n"
+ " WHERE job_name = :jobName AND namespace_name=:namespaceName\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.name = :jobName AND j.namespace_name=:namespaceName\n"
+ " ORDER BY jv.created_at DESC\n"
+ ")\n"
+ "SELECT jv.*,\n"
Expand Down
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ public interface LineageDao {
+ " )\n"
+ "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "FROM lineage l2\n"
+ "INNER JOIN jobs j ON j.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n"
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

@SqlQuery("SELECT uuid from jobs where name = :jobName and namespace_name = :namespace")
Optional<UUID> getJobUuid(String jobName, String namespace);

@SqlQuery(
"SELECT ds.*, dv.fields, dv.lifecycle_state\n"
+ "FROM datasets ds\n"
Expand All @@ -86,7 +83,7 @@ public interface LineageDao {
@SqlQuery(
"WITH latest_runs AS (\n"
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs r\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " WHERE jv.job_uuid in (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
Expand All @@ -192,6 +193,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
runUuid,
event.getRun().getRunId(),
now,
job.getUuid(),
null,
runArgs.getUuid(),
nominalStartTime,
Expand Down
19 changes: 11 additions & 8 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.Field;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.RunState;
Expand Down Expand Up @@ -69,9 +68,9 @@ public interface RunDao extends BaseDao {

String BASE_FIND_RUN_SQL =
"SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ "jv.namespace_name, jv.job_name, jv.version AS job_version,\n"
+ "jv.version AS job_version,\n"
+ "ri.input_versions, ro.output_versions\n"
+ "FROM runs AS r\n"
+ "FROM runs_view AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
Expand Down Expand Up @@ -156,6 +155,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ "external_id, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "job_version_uuid, "
+ "run_args_uuid, "
+ "nominal_start_time, "
Expand All @@ -171,6 +171,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ ":externalId, "
+ ":now, "
+ ":now, "
+ ":jobUuid,"
+ ":jobVersionUuid, "
+ ":runArgsUuid, "
+ ":nominalStartTime, "
Expand All @@ -194,6 +195,7 @@ ExtendedRunRow upsert(
UUID runUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Expand All @@ -211,6 +213,7 @@ ExtendedRunRow upsert(
+ "external_id, "
+ "created_at, "
+ "updated_at, "
+ "job_uuid, "
+ "job_version_uuid, "
+ "run_args_uuid, "
+ "nominal_start_time, "
Expand All @@ -224,6 +227,7 @@ ExtendedRunRow upsert(
+ ":externalId, "
+ ":now, "
+ ":now, "
+ ":jobUuid, "
+ ":jobVersionUuid, "
+ ":runArgsUuid, "
+ ":nominalStartTime, "
Expand All @@ -243,6 +247,7 @@ ExtendedRunRow upsert(
UUID runUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Expand Down Expand Up @@ -347,7 +352,7 @@ default void updateInputDatasetMapping(Set<DatasetId> inputs, UUID runUuid) {
/** Insert from run creates a run but does not associate any datasets. */
@Transaction
default RunRow upsertRunMeta(
NamespaceName namespaceName, JobName jobName, RunMeta runMeta, RunState currentState) {
NamespaceName namespaceName, JobRow jobRow, RunMeta runMeta, RunState currentState) {
Instant now = Instant.now();

NamespaceRow namespaceRow =
Expand All @@ -363,24 +368,22 @@ default RunRow upsertRunMeta(
Utils.toJson(runMeta.getArgs()),
Utils.checksumFor(runMeta.getArgs()));

JobRow jobRow =
createJobDao().findJobByNameAsRow(namespaceName.getValue(), jobName.getValue()).get();

UUID uuid = runMeta.getId().map(RunId::getValue).orElse(UUID.randomUUID());

RunRow runRow =
upsert(
uuid,
null,
now,
jobRow.getUuid(),
null,
runArgsRow.getUuid(),
runMeta.getNominalStartTime().orElse(null),
runMeta.getNominalEndTime().orElse(null),
currentState,
now,
namespaceRow.getName(),
jobName.getValue(),
jobRow.getName(),
jobRow.getLocation(),
jobRow.getJobContextUuid().orElse(null));

Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/SearchDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface SearchDao {
+ " WHERE d.name ilike '%' || :query || '%'\n"
+ " UNION\n"
+ " SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM jobs AS j\n"
+ " FROM jobs_view AS j\n"
+ " WHERE j.name ilike '%' || :query || '%'\n"
+ ") AS results\n"
+ "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n"
Expand Down
Loading