Skip to content

Commit

Permalink
Initial backfill script for Airflow runs
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed May 6, 2022
1 parent d2ba056 commit 620a18c
Show file tree
Hide file tree
Showing 12 changed files with 828 additions and 46 deletions.
78 changes: 78 additions & 0 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
Expand All @@ -37,6 +39,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -50,6 +53,7 @@
import marquez.service.models.DatasetMeta;
import marquez.service.models.DbTableMeta;
import marquez.service.models.LineageEvent;
import marquez.service.models.LineageEvent.ParentRunFacet;
import marquez.service.models.StreamMeta;
import org.apache.commons.lang3.tuple.Triple;

Expand All @@ -61,6 +65,16 @@ private Utils() {}
public static final String VERSION_DELIM = ":";
public static final Joiner VERSION_JOINER = Joiner.on(VERSION_DELIM).skipNulls();

/**
* pre-defined NAMESPACE_URL defined in RFC4122. This is the namespace used by the OpenLineage
* Airflow integration for constructing some run IDs as UUIDs. We use the same namespace to
* construct the same UUIDs when absolutely necessary (e.g., backfills, backward compatibility)
*
* @see https://datatracker.ietf.org/doc/html/rfc4122#appendix-C
*/
public static final UUID NAMESPACE_URL_UUID =
UUID.fromString("6ba7b811-9dad-11d1-80b4-00c04fd430c8");

private static final ObjectMapper MAPPER = newObjectMapper();

private static final int UUID_LENGTH = 36;
Expand Down Expand Up @@ -141,6 +155,70 @@ public static UUID toUuid(@NonNull final String uuidString) {
return UUID.fromString(uuidString);
}

/**
* Construct a name-based {@link UUID} based on the {@link #NAMESPACE_URL_UUID} namespace. Name
* parts are separated by a dot (.) character.
*
* @see https://datatracker.ietf.org/doc/html/rfc4122#page-13
* @param nameParts
* @return
*/
public static UUID toNameBasedUuid(String... nameParts) {
String constructedName = String.join(".", nameParts);

final byte[] nameBytes = constructedName.getBytes(StandardCharsets.UTF_8);

ByteBuffer buffer = ByteBuffer.allocate(nameBytes.length + 16);
buffer.putLong(NAMESPACE_URL_UUID.getMostSignificantBits());
buffer.putLong(NAMESPACE_URL_UUID.getLeastSignificantBits());
buffer.put(nameBytes);

return UUID.nameUUIDFromBytes(buffer.array());
}

/**
* Construct a UUID from a {@link ParentRunFacet} - if the {@link
* marquez.service.models.LineageEvent.RunLink#runId} field is a valid {@link UUID}, use it.
* Otherwise, compute a {@link UUID} from the job name and the reported runId. If the job name
* contains a dot (.), only return the portion up to the last dot in the name (this attempts to
* address airflow tasks, which always report the job name as <dag_name>.<task_name>
*
* @param parent
* @return
*/
public static UUID findParentRunUuid(ParentRunFacet parent) {
String jobName = parent.getJob().getName();
String parentRunId = parent.getRun().getRunId();
return findParentRunUuid(jobName, parentRunId);
}

public static UUID findParentRunUuid(String parentJobName, String parentRunId) {
String dagName = parseParentJobName(parentJobName);
return toUuid(parentRunId, dagName);
}

public static String parseParentJobName(String parentJobName) {
return parentJobName.contains(".")
? parentJobName.substring(0, parentJobName.lastIndexOf('.'))
: parentJobName;
}

/**
* Compute a UUID from a RunId and a jobName
*
* @see Utils#toNameBasedUuid(String...) for details on the UUID construction.
* @param runId
* @param jobName
* @return
*/
public static UUID toUuid(@NotNull String runId, String jobName) {
try {
return UUID.fromString(runId);
} catch (IllegalArgumentException e) {
return Utils.toNameBasedUuid(jobName, runId);
}
}

public static Instant toInstant(@Nullable final String asIso) {
return (asIso == null) ? null : Instant.from(ISO_INSTANT.parse(asIso));
}
Expand Down
70 changes: 63 additions & 7 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ public interface JobDao extends BaseDao {
@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
FROM jobs_view j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
Expand Down Expand Up @@ -261,7 +261,7 @@ INSERT INTO jobs AS j (
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) DO
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
Expand All @@ -285,4 +285,60 @@ JobRow upsertJob(
String location,
UUID symlinkTargetId,
PGobject inputs);

@SqlQuery(
"""
INSERT INTO jobs AS j (
uuid,
parent_job_uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
name,
description,
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
) VALUES (
:uuid,
:parentJobUuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:name,
:description,
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
""")
JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;

/**
* This migration is dependent on the migration found in the SQL script for V42. This updates the
* This migration is dependent on the migration found in the SQL script for V43. This updates the
* runs table to include the <code>job_uuid</code> value for each record. We update the table in
* batches to avoid table-level locks so that concurrent reads and writes can continue to take
* place. Auto-commit is enabled, so it is entirely possible that this migration will fail partway
* through and some records will retain the <code>job_uuid</code> value while others will not. This
* is intentional as no harm will come from leaving these values in place in case of rollback.
*/
@Slf4j
public class V43__UpdateRunsWithJobUUID extends BaseJavaMigration {
public class V43_1__UpdateRunsWithJobUUID implements JavaMigration {

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("43.1");
}

// don't execute in a transaction so each batch can be committed immediately
@Override
Expand All @@ -41,7 +48,7 @@ public void migrate(Context context) throws Exception {
String uuid = resultSet.getString("uuid");
String jobName = resultSet.getString("name");
String namespace = resultSet.getString("namespace_name");
updatePs.setString(1, uuid);
updatePs.setObject(1, UUID.fromString(uuid));
updatePs.setString(2, jobName);
updatePs.setString(3, namespace);
if (!updatePs.execute()) {
Expand All @@ -53,4 +60,24 @@ public void migrate(Context context) throws Exception {
}
}
}

@Override
public String getDescription() {
return "UpdateRunsWithJobUUID";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Loading

0 comments on commit 620a18c

Please sign in to comment.