Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OL facet tables #2152

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.29.0...HEAD)

### Added

* Split `lineage_events` table to `dataset_facets`, `run_facets`, and `job_facets` tables. [`2152`](https://github.com/MarquezProject/marquez/pull/2152)
[@wslulciuc](https://github.com/wslulciuc,), [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
* Performance improvement with migration procedure that requires manual steps if database has more than 100K lineage events.
* Please read [here](https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V55__readme.md) to get more database migration details.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I would reword as follows:

Note: We highly encourage users to review our migration plan.


## [0.29.0](https://github.com/MarquezProject/marquez/compare/0.28.0...0.29.0) - 2022-12-19

### Added
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import marquez.api.filter.JobRedirectFilter;
import marquez.cli.MetadataCommand;
import marquez.cli.SeedCommand;
import marquez.cli.V55MigrationCommand;
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.logging.LoggingMdcFilter;
Expand Down Expand Up @@ -149,6 +150,12 @@ public void registerResources(
}
}

@Override
protected void addDefaultCommands(Bootstrap<MarquezConfig> bootstrap) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid overriding addDefaultCommands(), we can just register the command within MarquezApp.initialize():

@Override
public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
  bootstrap.addCommand(new V55MigrationCommand());
  // continue initialization ...
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing commands are kind of no-arg commands. Migration commands needs to be able to access database connection. That's why I included it here so that it can get Application as a constructor param. Within initialize commands are added although application is not created yet.

bootstrap.addCommand(new V55MigrationCommand(this));
super.addDefaultCommands(bootstrap);
}

private MarquezContext buildMarquezContext(
MarquezConfig config, Environment env, ManagedDataSource source) {
final JdbiFactory factory = new JdbiFactory();
Expand Down
90 changes: 90 additions & 0 deletions api/src/main/java/marquez/cli/V55MigrationCommand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.cli;

import io.dropwizard.Application;
import io.dropwizard.cli.EnvironmentCommand;
import io.dropwizard.db.DataSourceFactory;
import io.dropwizard.db.ManagedDataSource;
import io.dropwizard.jdbi3.JdbiFactory;
import io.dropwizard.setup.Environment;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import marquez.db.migrations.V55_5__BackfillFacets;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

/**
* A command to manually run V55 database migration. This migration requires a heavy DB operation
* which can be done asynchronously (with limited API downtime) due to separate migration command.
*
* <p>Please refer to @link marquez/db/migration/V55__readme.md for more details.
*/
@Slf4j
public class V55MigrationCommand<MarquezConfig> extends EnvironmentCommand<marquez.MarquezConfig> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is V55MigrationCommand too specific (but also vague)? We'll certainly have more migrations in the future and one-off commands for each will be hard to manage and not the best user experience. I suggest we name the class DbMigrationsCommand and define v55_migrate as a subcommand. This tutorial is a pretty good reference on how to define subcommands.


private static final String COMMAND_NAME = "v55_migrate";
private static final String COMMAND_DESCRIPTION =
"""
A command to manually run V55 database migration.
Please refer to https://github.com/MarquezProject/marquez/blob/main/api/src/main/resources/marquez/db/migration/V55__readme.md for more details.
""";

/**
* Creates a new environment command.
*
* @param application the application providing this command
*/
public V55MigrationCommand(Application<marquez.MarquezConfig> application) {
super(application, COMMAND_NAME, COMMAND_DESCRIPTION);
}

@Override
public void configure(Subparser subparser) {
subparser
.addArgument("--chunkSize")
.dest("chunkSize")
.type(Integer.class)
.required(false)
.setDefault(V55_5__BackfillFacets.DEFAULT_CHUNK_SIZE)
.help("amount of lineage_events rows processed in a single SQL query and transaction.");
addFileArgument(subparser);
}

@Override
protected void run(
Environment environment, Namespace namespace, marquez.MarquezConfig configuration)
throws Exception {
log.info("Running v55 migration command");

final DataSourceFactory sourceFactory = configuration.getDataSourceFactory();
final DataSource source = sourceFactory.build(environment.metrics(), "MarquezApp-source");

final JdbiFactory factory = new JdbiFactory();
final Jdbi jdbi =
factory
.build(
environment,
configuration.getDataSourceFactory(),
(ManagedDataSource) source,
"postgresql-command")
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());

V55_5__BackfillFacets migration = new V55_5__BackfillFacets();
migration.setTriggeredByCommand(true);
migration.setJdbi(jdbi);
migration.setChunkSize(namespace.getInt("chunkSize"));
migration.migrate(null);

log.info("Migration finished successfully");
}
}
9 changes: 9 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,13 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
ColumnLineageDao createColumnLineageDao();

@CreateSqlObject
DatasetFacetsDao createDatasetFacetsDao();

@CreateSqlObject
JobFacetsDao createJobFacetsDao();

@CreateSqlObject
RunFacetsDao createRunFacetsDao();
}
14 changes: 14 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import org.postgresql.util.PGInterval;
import org.postgresql.util.PGobject;

@Slf4j
public final class Columns {
Expand Down Expand Up @@ -299,4 +300,17 @@ public static ImmutableMap<String, String> mapOrNull(final ResultSet results, fi
final String mapAsString = results.getString(column);
return Utils.fromJson(mapAsString, new TypeReference<>() {});
}

public static PGobject toPgObject(@NonNull final Object object) {
final PGobject jsonObject = new PGobject();
jsonObject.setType("jsonb");
final String json = Utils.toJson(object);
try {
jsonObject.setValue(json);
} catch (SQLException e) {
log.error("Error when ...", e);
return null;
}
return jsonObject;
}
}
27 changes: 12 additions & 15 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ WITH selected_datasets AS (
FROM datasets_view d
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
Expand All @@ -104,13 +104,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
Expand Down
Loading