diff --git a/api/src/main/java/marquez/common/Utils.java b/api/src/main/java/marquez/common/Utils.java index e2f79ef900..ba5e7b77ad 100644 --- a/api/src/main/java/marquez/common/Utils.java +++ b/api/src/main/java/marquez/common/Utils.java @@ -26,6 +26,8 @@ import java.io.UncheckedIOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.ZonedDateTime; import java.util.List; @@ -37,6 +39,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import lombok.Builder; import lombok.Getter; import lombok.NonNull; @@ -50,6 +53,7 @@ import marquez.service.models.DatasetMeta; import marquez.service.models.DbTableMeta; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.ParentRunFacet; import marquez.service.models.StreamMeta; import org.apache.commons.lang3.tuple.Triple; @@ -61,6 +65,16 @@ private Utils() {} public static final String VERSION_DELIM = ":"; public static final Joiner VERSION_JOINER = Joiner.on(VERSION_DELIM).skipNulls(); + /** + * pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage + * Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to + * construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility) + * + * @see https://datatracker.ietf.org/doc/html/rfc4122#appendix-C + */ + public static final UUID NAMESPACE_URL_UUID = + UUID.fromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8"); + private static final ObjectMapper MAPPER = newObjectMapper(); private static final int UUID_LENGTH = 36; @@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) { return UUID.fromString(uuidString); } + /** + * Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name + * parts are separated by a dot (.) character. + * + * @see https://datatracker.ietf.org/doc/html/rfc4122#page-13 + * @param nameParts + * @return + */ + public static UUID toNameBasedUuid(String... nameParts) { + String constructedName = String.join(".", nameParts); + + final byte[] nameBytes = constructedName.getBytes(StandardCharsets.UTF_8); + + ByteBuffer buffer = ByteBuffer.allocate(nameBytes.length + 16); + buffer.putLong(NAMESPACE_URL_UUID.getMostSignificantBits()); + buffer.putLong(NAMESPACE_URL_UUID.getLeastSignificantBits()); + buffer.put(nameBytes); + + return UUID.nameUUIDFromBytes(buffer.array()); + } + + /** + * Construct a UUID from a {@link ParentRunFacet} - if the {@link + * marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it. + * Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name + * contains a dot (.), only return the portion up to the last dot in the name (this attempts to + * address airflow tasks, which always report the job name as . + * + * @param parent + * @return + */ + public static UUID findParentRunUuid(ParentRunFacet parent) { + String jobName = parent.getJob().getName(); + String parentRunId = parent.getRun().getRunId(); + return findParentRunUuid(jobName, parentRunId); + } + + public static UUID findParentRunUuid(String parentJobName, String parentRunId) { + String dagName = parseParentJobName(parentJobName); + return toUuid(parentRunId, dagName); + } + + public static String parseParentJobName(String parentJobName) { + return parentJobName.contains(".") + ? parentJobName.substring(0, parentJobName.lastIndexOf('.')) + : parentJobName; + } + + /** + * Compute a UUID from a RunId and a jobName + * + * @see Utils#toNameBasedUuid(String...) for details on the UUID construction. + * @param runId + * @param jobName + * @return + */ + public static UUID toUuid(@NotNull String runId, String jobName) { + try { + return UUID.fromString(runId); + } catch (IllegalArgumentException e) { + return Utils.toNameBasedUuid(jobName, runId); + } + } + public static Instant toInstant(@Nullable final String asIso) { return (asIso == null) ? null : Instant.from(ISO_INSTANT.parse(asIso)); } diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 07dc57e47b..d501ce2d93 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -50,17 +50,17 @@ public interface JobDao extends BaseDao { @SqlQuery( """ WITH RECURSIVE job_ids AS ( - SELECT uuid, symlink_target_uuid - FROM jobs j + SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid + FROM jobs_view j WHERE j.namespace_name=:namespaceName AND j.name=:jobName UNION - SELECT j.uuid, j.symlink_target_uuid - FROM jobs j + SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid + FROM jobs_view j INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid ) SELECT j.*, jc.context, f.facets - FROM jobs j - INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL + FROM jobs_view j + INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( @@ -261,7 +261,7 @@ INSERT INTO jobs AS j ( :location, :inputs, :symlinkTargetId - ) ON CONFLICT (name, namespace_uuid) DO + ) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO UPDATE SET updated_at = EXCLUDED.updated_at, type = EXCLUDED.type, @@ -285,4 +285,60 @@ JobRow upsertJob( String location, UUID symlinkTargetId, PGobject inputs); + + @SqlQuery( + """ + INSERT INTO jobs AS j ( + uuid, + parent_job_uuid, + type, + created_at, + updated_at, + namespace_uuid, + namespace_name, + name, + description, + current_job_context_uuid, + current_location, + current_inputs, + symlink_target_uuid + ) VALUES ( + :uuid, + :parentJobUuid, + :type, + :now, + :now, + :namespaceUuid, + :namespaceName, + :name, + :description, + :jobContextUuid, + :location, + :inputs, + :symlinkTargetId + ) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO + UPDATE SET + updated_at = EXCLUDED.updated_at, + type = EXCLUDED.type, + description = EXCLUDED.description, + current_job_context_uuid = EXCLUDED.current_job_context_uuid, + current_location = EXCLUDED.current_location, + current_inputs = EXCLUDED.current_inputs, + -- update the symlink target if not null. otherwise, keep the old value + symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid) + RETURNING * + """) + JobRow upsertJob( + UUID uuid, + UUID parentJobUuid, + JobType type, + Instant now, + UUID namespaceUuid, + String namespaceName, + String name, + String description, + UUID jobContextUuid, + String location, + UUID symlinkTargetId, + PGobject inputs); } diff --git a/api/src/main/java/marquez/db/migrations/V43__UpdateRunsWithJobUUID.java b/api/src/main/java/marquez/db/migrations/V43_1__UpdateRunsWithJobUUID.java similarity index 74% rename from api/src/main/java/marquez/db/migrations/V43__UpdateRunsWithJobUUID.java rename to api/src/main/java/marquez/db/migrations/V43_1__UpdateRunsWithJobUUID.java index 9815928356..2670947cae 100644 --- a/api/src/main/java/marquez/db/migrations/V43__UpdateRunsWithJobUUID.java +++ b/api/src/main/java/marquez/db/migrations/V43_1__UpdateRunsWithJobUUID.java @@ -3,12 +3,14 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.UUID; import lombok.extern.slf4j.Slf4j; -import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.MigrationVersion; import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; /** - * This migration is dependent on the migration found in the SQL script for V42. This updates the + * This migration is dependent on the migration found in the SQL script for V43. This updates the * runs table to include the job_uuid value for each record. We update the table in * batches to avoid table-level locks so that concurrent reads and writes can continue to take * place. Auto-commit is enabled, so it is entirely possible that this migration will fail partway @@ -16,7 +18,12 @@ * is intentional as no harm will come from leaving these values in place in case of rollback. */ @Slf4j -public class V43__UpdateRunsWithJobUUID extends BaseJavaMigration { +public class V43_1__UpdateRunsWithJobUUID implements JavaMigration { + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("43.1"); + } // don't execute in a transaction so each batch can be committed immediately @Override @@ -41,7 +48,7 @@ public void migrate(Context context) throws Exception { String uuid = resultSet.getString("uuid"); String jobName = resultSet.getString("name"); String namespace = resultSet.getString("namespace_name"); - updatePs.setString(1, uuid); + updatePs.setObject(1, UUID.fromString(uuid)); updatePs.setString(2, jobName); updatePs.setString(3, namespace); if (!updatePs.execute()) { @@ -53,4 +60,24 @@ public void migrate(Context context) throws Exception { } } } + + @Override + public String getDescription() { + return "UpdateRunsWithJobUUID"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } } diff --git a/api/src/main/java/marquez/db/migrations/V44_1__BackfillAirflowParentRuns.java b/api/src/main/java/marquez/db/migrations/V44_1__BackfillAirflowParentRuns.java new file mode 100644 index 0000000000..695192bfb1 --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V44_1__BackfillAirflowParentRuns.java @@ -0,0 +1,193 @@ +package marquez.db.migrations; + +import java.util.List; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.result.ResultProducers; + +/** + * This migration is dependent on the migration script in V44__runs_job_versions_add_job_uuid.sql. + * The version returned by this code is intentionally after V44, but before V45 so that casual + * contributors won't accidentally create a migration script that conflicts with the version of this + * one. + * + *

This migration backfills parent jobs and runs for Airflow DAGs that reported ParentRunFacets + * that used the task name as the job name and the scheduled run name as the runId. For example, + * many Airflow tasks report the ParentRunFacet as + * "parentRun": { + * "job": { + * "name": "simple_shortrunning_dag.tasks__1_of_3", + * "namespace": "abcdefg" + * }, + * "run": { + * "runId": "scheduled__2022-03-14T01:40:10+00:00" + * } + * } + * The RunId can't be mapped to an existing Marquez run and the job name is actually a + * concatenation of the DAG name and the task (which is the name assigned to the job for the task + * itself). + * + *

This migration aims to create a real parent run, with a distinct run id, and parent job for + * each such run by constructing a deterministic run id from the DAG name and its runId. From each + * run, a job is created with the name field set to the DAG name. + */ +@Slf4j +public class V44_1__BackfillAirflowParentRuns implements JavaMigration { + + /** + * Return a numeric version that is greater than 44 (so it executes after that one) but before 45 + */ + public static final MigrationVersion MIGRATION_VERSION = MigrationVersion.fromVersion("44.1"); + + private static final String FIND_AIRFLOW_PARENT_RUNS_SQL = + """ + SELECT DISTINCT(run_uuid) AS run_uuid, + e.parent_run_id, + e.parent_job_name, + e.parent_job_namespace + FROM runs r + LEFT JOIN LATERAL ( + SELECT run_uuid, + event->'run'->'facets'->'parent'->'run'->>'runId' AS parent_run_id, + event->'run'->'facets'->'parent'->'job'->>'name' AS parent_job_name, + event->'run'->'facets'->'parent'->'job'->>'namespace' AS parent_job_namespace + FROM lineage_events le + WHERE le.run_uuid=r.uuid + AND event->'run'->'facets'->'parent'->'run'->>'runId' IS NOT NULL + AND event->'run'->'facets'->'airflow_version' IS NOT NULL + ) e ON e.run_uuid=r.uuid + WHERE e.parent_run_id IS NOT NULL +"""; + public static final String INSERT_PARENT_JOB_QUERY = + """ + INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, description, + namespace_name, current_location) + SELECT :uuid, type, created_at, updated_at, namespace_uuid, :name, description, namespace_name, + current_location + FROM jobs + WHERE namespace_name=:namespace AND name=:jobName + ON CONFLICT(name, namespace_uuid) WHERE parent_job_uuid IS NULL + DO UPDATE SET updated_at=now() + RETURNING uuid + """; + public static final String INSERT_PARENT_RUN_QUERY = + """ + INSERT INTO runs (uuid, created_at, updated_at, current_run_state, external_id, namespace_name, job_name, job_uuid, location, transitioned_at, started_at, ended_at) + SELECT :parentRunUuid, created_at, updated_at, current_run_state, :externalRunid, :namespace, :jobName, :parentJobUuid, location, transitioned_at, started_at, ended_at + FROM runs + WHERE uuid=:runUuid + ON CONFLICT (uuid) DO NOTHING + """; + + @Override + public void migrate(Context context) throws Exception { + Jdbi jdbi = Jdbi.create(context.getConnection()); + List parentRuns = + jdbi.withHandle( + h -> + h.createQuery(FIND_AIRFLOW_PARENT_RUNS_SQL) + .map( + rs -> { + String parentRunId = rs.getColumn("parent_run_id", String.class); + if (parentRunId == null) { + return null; + } + String parentJobName = rs.getColumn("parent_job_name", String.class); + String dagName = + parentJobName.contains(".") + ? parentJobName.substring(0, parentJobName.lastIndexOf('.')) + : parentJobName; + String parentJobNamespace = + rs.getColumn("parent_job_namespace", String.class); + String runUuid = rs.getColumn("run_uuid", String.class); + log.info( + "Found likely airflow run {} with parent {}.{} run {}", + runUuid, + parentJobNamespace, + parentJobName, + parentRunId); + UUID parentRunUuid; + try { + parentRunUuid = UUID.fromString(parentRunId); + } catch (IllegalArgumentException e) { + parentRunUuid = Utils.toNameBasedUuid(dagName, parentRunId); + } + + return new ParentRun( + UUID.fromString(runUuid), + dagName, + parentJobName, + parentJobNamespace, + parentRunId, + parentRunUuid); + }) + .list()); + parentRuns.forEach( + parent -> { + UUID parentJobUuid = + jdbi.withHandle( + h -> + h.createQuery(INSERT_PARENT_JOB_QUERY) + .bind("uuid", UUID.randomUUID()) + .bind("name", parent.dagName()) + .bind("namespace", parent.namespace()) + .bind("jobName", parent.jobName()) + .execute(ResultProducers.returningGeneratedKeys()) + .map((rs, ctx) -> UUID.fromString(rs.getString(1))) + .first()); + jdbi.withHandle( + h -> + h.createQuery(INSERT_PARENT_RUN_QUERY) + .bind("parentRunUuid", parent.parentRunId()) + .bind("externalRunid", parent.externalParentRunId()) + .bind("namespace", parent.namespace()) + .bind("jobName", parent.jobName()) + .bind("parentJobUuid", parentJobUuid) + .bind("runUuid", parent.runUuid()) + .execute(ResultProducers.returningUpdateCount())); + }); + } + + private record ParentRun( + UUID runUuid, + String dagName, + String jobName, + String namespace, + String externalParentRunId, + UUID parentRunId) {} + + @Override + public MigrationVersion getVersion() { + return MIGRATION_VERSION; + } + + @Override + public String getDescription() { + return "BackfillAirflowParentRuns"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } +} diff --git a/api/src/main/java/marquez/db/migrations/V44_2_BackfillJobsWithParents.java b/api/src/main/java/marquez/db/migrations/V44_2_BackfillJobsWithParents.java new file mode 100644 index 0000000000..16f0ac2038 --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V44_2_BackfillJobsWithParents.java @@ -0,0 +1,154 @@ +package marquez.db.migrations; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.service.models.LineageEvent.ParentRunFacet; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.result.ResultProducers; + +@Slf4j +public class V44_2_BackfillJobsWithParents implements JavaMigration { + + public static final String FIND_JOBS_WITH_PARENT_RUNS = + """ + SELECT j.uuid AS job_uuid, p.parent + FROM jobs j + LEFT JOIN LATERAL( + SELECT uuid AS run_uuid FROM runs + WHERE job_name=j.name AND namespace_name=j.namespace_name + ORDER BY transitioned_at DESC + LIMIT 1 + ) r ON true + LEFT JOIN LATERAL ( + SELECT event->'run'->'facets'->'parent' parent FROM lineage_events + WHERE run_uuid=r.run_uuid + AND event->'run'->'facets'->'parent' IS NOT NULL + ORDER BY event_time DESC + LIMIT 1 + ) p ON true + WHERE p.parent IS NOT NULL + """; + + public static final String FIND_JOB_UUID_FOR_RUN = + """ + SELECT job_uuid FROM runs WHERE uuid=:uuid AND job_uuid IS NOT NULL + """; + public static final String INSERT_NEW_JOB_WITH_PARENT = + """ + INSERT INTO jobs(uuid, type, created_at, updated_at, namespace_uuid, name, description, + current_version_uuid, namespace_name, current_job_context_uuid, current_location, current_inputs, + parent_job_uuid) + SELECT :uuid, type, created_at, updated_at, namespace_uuid, name, description, current_version_uuid, + namespace_name, current_job_context_uuid, current_location, current_inputs, :parent_job_uuid + FROM jobs + WHERE uuid=:job_uuid + ON CONFLICT (name, namespace_name, parent_job_uuid) DO NOTHING + RETURNING uuid + """; + public static final String SYMLINK_OLD_JOB_TO_NEW = + "UPDATE jobs SET symlink_target_uuid=:target_uuid WHERE uuid=:job_uuid"; + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("44.2"); + } + + @Override + public void migrate(Context context) throws Exception { + Jdbi jdbi = Jdbi.create(context.getConnection()); + List jobParents = + jdbi.withHandle( + h -> + h + .createQuery(FIND_JOBS_WITH_PARENT_RUNS) + .map( + (rs, ctx) -> { + Optional parentRunUuid = findParentRunIdForJob(rs); + UUID jobId = UUID.fromString(rs.getString("job_uuid")); + return parentRunUuid.map(runId -> new JobParent(jobId, runId)); + }) + .stream() + .flatMap(Optional::stream) + .collect(Collectors.toList())); + jobParents.forEach( + jp -> { + Optional jobUuid = + jdbi.withHandle( + h -> h.createQuery(FIND_JOB_UUID_FOR_RUN).bind("uuid", jp.parentRunId)) + .map((rs, ctx) -> UUID.fromString(rs.getString("job_uuid"))) + .findFirst(); + jobUuid + .flatMap( + uuid -> + jdbi.withHandle( + h -> + h.createQuery(INSERT_NEW_JOB_WITH_PARENT) + .bind("uuid", UUID.randomUUID()) + .bind("parent_job_uuid", uuid) + .bind("job_uuid", jp.jobId) + .execute(ResultProducers.returningGeneratedKeys("uuid")) + .map(rs -> rs.getColumn("uuid", UUID.class)) + .findFirst())) + .ifPresent( + newTargetUuid -> { + jdbi.withHandle( + h -> + h.createQuery(SYMLINK_OLD_JOB_TO_NEW) + .bind("job_uuid", jp.jobId) + .bind("target_uuid", newTargetUuid)) + .execute(ResultProducers.returningUpdateCount()); + }); + }); + } + + private Optional findParentRunIdForJob(ResultSet resultSet) throws SQLException { + String parentJson = resultSet.getString("parent"); + try { + ParentRunFacet parentRunFacet = Utils.getMapper().readValue(parentJson, ParentRunFacet.class); + return Optional.of(Utils.findParentRunUuid(parentRunFacet)); + } catch (JsonProcessingException e) { + log.warn( + "Unable to process parent run facet from event for run {}: {}", + resultSet.getString("run_uuid"), + parentJson); + } + return Optional.empty(); + } + + private record JobParent(UUID jobId, UUID parentRunId) {} + + @Override + public String getDescription() { + return "BackfillJobsWithParents"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } +} diff --git a/api/src/main/resources/marquez/db/migration/V41__alter_jobs_add_job_parent_id.sql b/api/src/main/resources/marquez/db/migration/V41__alter_jobs_add_job_parent_id.sql deleted file mode 100644 index 38b1e0fe24..0000000000 --- a/api/src/main/resources/marquez/db/migration/V41__alter_jobs_add_job_parent_id.sql +++ /dev/null @@ -1,33 +0,0 @@ -ALTER TABLE jobs ADD COLUMN IF NOT EXISTS parent_job_id uuid CONSTRAINT jobs_parent_fk_jobs REFERENCES jobs (uuid); -ALTER TABLE runs ADD COLUMN IF NOT EXISTS parent_run_id uuid CONSTRAINT runs_parent_fk_runs REFERENCES runs (uuid); -DROP INDEX IF EXISTS jobs_name_index; -CREATE UNIQUE INDEX IF NOT EXISTS jobs_name_parent ON jobs (name, namespace_name, parent_job_id) INCLUDE (uuid, name, namespace_name, parent_job_id); - -CREATE OR REPLACE VIEW jobs_view -AS - with recursive job_fqn AS ( - SELECT uuid, name, namespace_name, parent_job_id - FROM jobs - UNION ALL - SELECT j1.uuid, - CASE WHEN j2.name IS NOT NULL THEN j2.name || '.' || j1.name ELSE j1.name END AS name, - CASE WHEN j2.namespace_name IS NOT NULL THEN j2.namespace_name ELSE j1.namespace_name END AS namespace_name, - j2.parent_job_id - FROM jobs j1 - INNER JOIN job_fqn j2 ON j2.uuid=j1.parent_job_id - ) - SELECT f.uuid, - f.name, - f.namespace_name, - j.name AS simple_name, - j.parent_job_id, - j.type, - j.created_at, - j.updated_at, - j.namespace_uuid, - j.description, - j.current_version_uuid, - j.current_job_context_uuid, - j.current_location, - j.current_inputs - FROM job_fqn f, jobs j WHERE j.uuid=f.uuid diff --git a/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql b/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql new file mode 100644 index 0000000000..8450f72922 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V43__alter_jobs_add_job_parent_id.sql @@ -0,0 +1,44 @@ +ALTER TABLE jobs ADD COLUMN IF NOT EXISTS parent_job_uuid uuid CONSTRAINT jobs_parent_fk_jobs REFERENCES jobs (uuid); +ALTER TABLE runs ADD COLUMN IF NOT EXISTS parent_run_uuid uuid CONSTRAINT runs_parent_fk_runs REFERENCES runs (uuid); + +DROP INDEX IF EXISTS jobs_name_index; +ALTER TABLE jobs DROP CONSTRAINT jobs_namespace_uuid_name_key; +DROP INDEX IF EXISTS jobs_namespace_uuid_name_key; + +CREATE UNIQUE INDEX IF NOT EXISTS jobs_name_parent ON jobs (name, namespace_name, parent_job_uuid); +CREATE UNIQUE INDEX IF NOT EXISTS jobs_namespace_uuid_name_parent ON jobs (name, namespace_uuid, parent_job_uuid); +CREATE UNIQUE INDEX IF NOT EXISTS jobs_namespace_uuid_name_null_parent ON jobs (name, namespace_uuid) WHERE parent_job_uuid IS NULL; +ALTER TABLE jobs ADD CONSTRAINT unique_jobs_namespace_uuid_name_parent UNIQUE USING INDEX jobs_namespace_uuid_name_parent; + +CREATE OR REPLACE VIEW jobs_view +AS + with recursive + job_fqn AS ( + SELECT uuid, name, namespace_name, parent_job_uuid + FROM jobs + UNION ALL + SELECT j1.uuid, + CASE WHEN j2.name IS NOT NULL THEN j2.name || '.' || j1.name ELSE j1.name END AS name, + CASE WHEN j2.namespace_name IS NOT NULL THEN j2.namespace_name ELSE j1.namespace_name END AS namespace_name, + j2.parent_job_uuid + FROM jobs j1 + INNER JOIN job_fqn j2 ON j2.uuid=j1.parent_job_uuid + ) + SELECT f.uuid, + f.name, + f.namespace_name, + j.name AS simple_name, + f.parent_job_uuid, + j.type, + j.created_at, + j.updated_at, + j.namespace_uuid, + j.description, + j.current_version_uuid, + j.current_job_context_uuid, + j.current_location, + j.current_inputs, + j.symlink_target_uuid + FROM job_fqn f, jobs j + WHERE j.uuid=f.uuid + AND f.parent_job_uuid IS NULL diff --git a/api/src/main/resources/marquez/db/migration/V42__runs_job_versions_add_job_uuid.sql b/api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql similarity index 100% rename from api/src/main/resources/marquez/db/migration/V42__runs_job_versions_add_job_uuid.sql rename to api/src/main/resources/marquez/db/migration/V44__runs_job_versions_add_job_uuid.sql diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 1b00fb1b13..a1b64f8543 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -16,6 +16,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.validation.Valid; import lombok.Value; import marquez.common.Utils; import marquez.db.models.UpdateLineageRow; @@ -61,6 +62,55 @@ public static UpdateLineageRow createLineageRow( JobFacet jobFacet, List inputs, List outputs) { + return createLineageRow(dao, jobName, status, jobFacet, inputs, outputs, null); + } + + /** + * Create an {@link UpdateLineageRow} from the input job details and datasets. + * + * @param dao + * @param jobName + * @param status + * @param jobFacet + * @param inputs + * @param outputs + * @param parentRunFacet + * @return + */ + public static UpdateLineageRow createLineageRow( + OpenLineageDao dao, + String jobName, + String status, + JobFacet jobFacet, + List inputs, + List outputs, + @Valid LineageEvent.ParentRunFacet parentRunFacet) { + return createLineageRow( + dao, jobName, status, jobFacet, inputs, outputs, parentRunFacet, ImmutableMap.of()); + } + + /** + * Create an {@link UpdateLineageRow} from the input job details and datasets. + * + * @param dao + * @param jobName + * @param status + * @param jobFacet + * @param inputs + * @param outputs + * @param parentRunFacet + * @param runFacets + * @return + */ + public static UpdateLineageRow createLineageRow( + OpenLineageDao dao, + String jobName, + String status, + JobFacet jobFacet, + List inputs, + List outputs, + @Valid LineageEvent.ParentRunFacet parentRunFacet, + ImmutableMap runFacets) { NominalTimeRunFacet nominalTimeRunFacet = new NominalTimeRunFacet(); nominalTimeRunFacet.setNominalStartTime( Instant.now().atZone(LOCAL_ZONE).truncatedTo(ChronoUnit.HOURS)); @@ -72,7 +122,7 @@ public static UpdateLineageRow createLineageRow( new LineageEvent( status, Instant.now().atZone(LOCAL_ZONE), - new Run(runId.toString(), new RunFacet(nominalTimeRunFacet, null, ImmutableMap.of())), + new Run(runId.toString(), new RunFacet(nominalTimeRunFacet, parentRunFacet, runFacets)), new Job(NAMESPACE, jobName, jobFacet), inputs, outputs, diff --git a/api/src/test/java/marquez/db/migrations/V44_1__BackfillAirflowParentRunsTest.java b/api/src/test/java/marquez/db/migrations/V44_1__BackfillAirflowParentRunsTest.java new file mode 100644 index 0000000000..342641b941 --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V44_1__BackfillAirflowParentRunsTest.java @@ -0,0 +1,103 @@ +package marquez.db.migrations; + +import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.createLineageRow; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.util.Collections; +import java.util.Optional; +import marquez.db.JobDao; +import marquez.db.LineageTestUtils; +import marquez.db.OpenLineageDao; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.Job; +import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobLink; +import marquez.service.models.LineageEvent.ParentRunFacet; +import marquez.service.models.LineageEvent.RunLink; +import org.flywaydb.core.api.configuration.Configuration; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +class V44_1__BackfillAirflowParentRunsTest { + + static Jdbi jdbi; + private static OpenLineageDao openLineageDao; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V44_1__BackfillAirflowParentRunsTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @Test + public void testMigrateAirflowTasks() { + String dagName = "airflowDag"; + String task1Name = dagName + ".task1"; + createLineageRow( + openLineageDao, + task1Name, + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList(), + new ParentRunFacet( + LineageTestUtils.PRODUCER_URL, + LineageTestUtils.SCHEMA_URL, + new RunLink("schedule:00:00:00"), + new JobLink(NAMESPACE, task1Name)), + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))); + + createLineageRow( + openLineageDao, + "airflowDag.task2", + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList(), + new ParentRunFacet( + LineageTestUtils.PRODUCER_URL, + LineageTestUtils.SCHEMA_URL, + new RunLink("schedule:00:00:00"), + new JobLink(NAMESPACE, task1Name)), + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))); + + createLineageRow( + openLineageDao, + "a_non_airflow_task", + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList()); + + jdbi.useHandle( + handle -> { + try { + new V44_1__BackfillAirflowParentRuns() + .migrate( + new Context() { + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Connection getConnection() { + return handle.getConnection(); + } + }); + } catch (Exception e) { + throw new AssertionError("Unable to execute migration", e); + } + }); + JobDao jobDao = jdbi.onDemand(JobDao.class); + Optional jobByName = jobDao.findJobByName(NAMESPACE, dagName); + assertThat(jobByName).isPresent(); + } +} diff --git a/api/src/test/java/marquez/db/migrations/V44_2_BackfillJobsWithParentsTest.java b/api/src/test/java/marquez/db/migrations/V44_2_BackfillJobsWithParentsTest.java new file mode 100644 index 0000000000..6fa998f69a --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V44_2_BackfillJobsWithParentsTest.java @@ -0,0 +1,109 @@ +package marquez.db.migrations; + +import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.createLineageRow; +import static org.assertj.core.api.Assertions.assertThat; + +import java.sql.Connection; +import java.util.Collections; +import java.util.Optional; +import marquez.common.models.JobName; +import marquez.db.JobDao; +import marquez.db.LineageTestUtils; +import marquez.db.OpenLineageDao; +import marquez.db.models.UpdateLineageRow; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.models.Job; +import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobLink; +import marquez.service.models.LineageEvent.ParentRunFacet; +import marquez.service.models.LineageEvent.RunLink; +import org.flywaydb.core.api.configuration.Configuration; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; + +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +class V44_2_BackfillJobsWithParentsTest { + + static Jdbi jdbi; + private static OpenLineageDao openLineageDao; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V44_2_BackfillJobsWithParentsTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @Test + public void testBackfill() { + String parentName = "parentJob"; + UpdateLineageRow parentJob = + createLineageRow( + openLineageDao, + parentName, + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList()); + + String task1Name = "task1"; + ParentRunFacet parentFacet = + new ParentRunFacet( + LineageTestUtils.PRODUCER_URL, + LineageTestUtils.SCHEMA_URL, + new RunLink(parentJob.getRun().getUuid().toString()), + new JobLink(NAMESPACE, parentName)); + createLineageRow( + openLineageDao, + task1Name, + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList(), + parentFacet, + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))); + + createLineageRow( + openLineageDao, + "task2", + "COMPLETE", + new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + Collections.emptyList(), + Collections.emptyList(), + parentFacet, + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))); + jdbi.useHandle( + handle -> { + try { + Context context = + new Context() { + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Connection getConnection() { + return handle.getConnection(); + } + }; + // apply migrations in order + new V43_1__UpdateRunsWithJobUUID().migrate(context); + new V44_2_BackfillJobsWithParents().migrate(context); + } catch (Exception e) { + throw new AssertionError("Unable to execute migration", e); + } + }); + + JobDao jobDao = jdbi.onDemand(JobDao.class); + Optional jobByName = jobDao.findJobByName(NAMESPACE, task1Name); + assertThat(jobByName) + .isPresent() + .get() + .hasFieldOrPropertyWithValue("name", new JobName(parentName + "." + task1Name)); + } +}