From 61e34fc7057d8be34f3c49eb63f53a51cbc2e9c7 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Mon, 26 Feb 2024 14:46:37 -0800 Subject: [PATCH] move weird migrators to new framework --- .../jdbc/AbstractJdbcDestination.java | 29 +++++++---- .../typing_deduping/JdbcV1V2Migrator.java | 12 +++-- .../jdbc/AbstractJdbcDestinationTest.java | 19 ++++++-- .../BaseDestinationV1V2Migrator.java | 48 +++++++++++++++---- .../typing_deduping/DefaultTyperDeduper.java | 24 ---------- .../DestinationV1V2Migrator.java | 25 ---------- .../NoOpDestinationV1V2Migrator.java | 17 ------- .../NoOpTyperDeduperWithV1V2Migrations.java | 14 ------ .../typing_deduping/NoopV2TableMigrator.java | 14 ------ .../typing_deduping/TyperDeduperUtil.kt | 39 --------------- .../typing_deduping/V2TableMigrator.java | 15 ------ .../DefaultTyperDeduperTest.java | 8 +--- .../DestinationV1V2MigratorTest.java | 5 +- 13 files changed, 87 insertions(+), 182 deletions(-) delete mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java delete mode 100644 airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index 86935bbe2937..5bc36c6bfcec 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -23,7 +23,6 @@ import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; -import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage; import io.airbyte.commons.exceptions.ConnectionErrorException; @@ -34,9 +33,10 @@ import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations; import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper; -import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator; import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -54,7 +54,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination { +public abstract class AbstractJdbcDestination + extends JdbcConnector implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class); @@ -254,10 +255,20 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map getDestinationHandler(final String databaseName, + protected abstract JdbcDestinationHandler getDestinationHandler(final String databaseName, final JdbcDatabase database, final String rawTableSchema); + /** + * Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of + * {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum. + */ + protected abstract List> getMigrations( + final JdbcDatabase database, + final String databaseName, + final SqlGenerator sqlGenerator, + final DestinationHandler destinationHandler); + /** * "database" key at root of the config json, for any other variants in config, override this * method. @@ -319,17 +330,15 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi .orElse(new CatalogParser(sqlGenerator)) .parseCatalog(catalog); final String databaseName = getDatabaseName(config); - final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName); - final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); - final DestinationHandler destinationHandler = + final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE)); + final List> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final TyperDeduper typerDeduper; if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrations); } else { - typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of()); + typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrations); } return typerDeduper; } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java index b398f202fc06..ca0a33bb9d0b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcV1V2Migrator.java @@ -10,7 +10,9 @@ import io.airbyte.commons.exceptions.SQLRuntimeException; import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator; import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; @@ -21,13 +23,18 @@ * Largely based on * {@link io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator}. */ -public class JdbcV1V2Migrator extends BaseDestinationV1V2Migrator { +public abstract class JdbcV1V2Migrator + extends BaseDestinationV1V2Migrator { private final NamingConventionTransformer namingConventionTransformer; private final JdbcDatabase database; private final String databaseName; - public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer, final JdbcDatabase database, final String databaseName) { + public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer, + final SqlGenerator sqlGenerator, + final JdbcDatabase database, + final String databaseName) { + super(sqlGenerator); this.namingConventionTransformer = namingConventionTransformer; this.database = database; this.databaseName = databaseName; @@ -72,5 +79,4 @@ protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()), tableName); } - } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java index f92102c79119..272799f07ccc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/test/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestinationTest.java @@ -16,7 +16,13 @@ import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; @@ -112,7 +118,7 @@ void testInvalidExtraParam() { () -> new TestJdbcDestination().getConnectionProperties(buildConfigWithExtraJdbcParameters(extraParam))); } - static class TestJdbcDestination extends AbstractJdbcDestination { + static class TestJdbcDestination extends AbstractJdbcDestination { private final Map defaultProperties; @@ -137,15 +143,22 @@ public JsonNode toJdbcConfig(final JsonNode config) { @Override protected JdbcSqlGenerator getSqlGenerator() { - // TODO do we need to populate this? return null; } @Override - protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) { + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) { return null; } + @Override + protected List> getMigrations(JdbcDatabase database, + String databaseName, + SqlGenerator sqlGenerator, + DestinationHandler destinationHandler) { + return Collections.emptyList(); + } + } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java index 1f33b9074952..4407e5dd1459 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseDestinationV1V2Migrator.java @@ -7,29 +7,53 @@ import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration; +import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.util.Collection; import java.util.Optional; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BaseDestinationV1V2Migrator implements DestinationV1V2Migrator { +public abstract class BaseDestinationV1V2Migrator + implements Migration { protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class); + private final SqlGenerator sqlGenerator; + + /** + * Should never be called. Exists so that we can mock this object, because Mockito + * requires a no-args constructor. + */ + protected BaseDestinationV1V2Migrator() { + this(null); + } + + protected BaseDestinationV1V2Migrator(SqlGenerator sqlGenerator) { + this.sqlGenerator = sqlGenerator; + } + + protected abstract DestinationState setV1V2MigrationDone(DestinationState state); + + @NotNull @Override - public void migrateIfNecessary( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, - final StreamConfig streamConfig) - throws Exception { + public MigrationResult migrateIfNecessary( + @NotNull DestinationHandler destinationHandler, + @NotNull StreamConfig streamConfig, + @NotNull DestinationInitialState state) { LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName()); if (shouldMigrate(streamConfig)) { LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName()); migrate(sqlGenerator, destinationHandler, streamConfig); LOGGER.info("V2 Migration completed successfully for stream {}", streamConfig.id().finalName()); + final DestinationState updatedState = setV1V2MigrationDone(state.destinationState()); + // The v2 raw table now exists. We should refetch the initial state. + return new MigrationResult<>(updatedState, true); } else { LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName()); + return new MigrationResult<>(state.destinationState(), false); } } @@ -40,12 +64,18 @@ public void migrateIfNecessary( * @param streamConfig the stream in question * @return whether to migrate the stream */ - protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exception { + protected boolean shouldMigrate(final StreamConfig streamConfig) { final var v1RawTable = convertToV1RawName(streamConfig); LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace()); final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode()); - final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig); - final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName()); + final boolean noValidV2RawTableExists; + final boolean aValidV1RawTableExists; + try { + noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig); + aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName()); + } catch (Exception e) { + throw new RuntimeException(e); + } LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}", syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists); return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists; diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index 2403564b4a95..0015e1f58368 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -57,9 +57,6 @@ public class DefaultTyperDeduper destinationHandler; - - private final DestinationV1V2Migrator v1V2Migrator; - private final V2TableMigrator v2TableMigrator; private final List> migrations; private final ParsedCatalog parsedCatalog; private Set overwriteStreamsWithTmpTable; @@ -82,14 +79,10 @@ public class DefaultTyperDeduper destinationHandler, final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, - final V2TableMigrator v2TableMigrator, final List> migrations) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; this.parsedCatalog = parsedCatalog; - this.v1V2Migrator = v1V2Migrator; - this.v2TableMigrator = v2TableMigrator; this.migrations = migrations; this.initialRawTableStateByStream = new ConcurrentHashMap<>(); this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size()); @@ -99,15 +92,6 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator, new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build()); } - public DefaultTyperDeduper( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, - final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, - final List> migrations) { - this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), migrations); - } - @Override public void prepareSchemasAndRawTables() throws Exception { // Technically kind of weird to call this here, but it's the best place we have. @@ -115,14 +99,6 @@ public void prepareSchemasAndRawTables() throws Exception { // until prepareFinalTables... but it doesn't really matter. TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog); - TyperDeduperUtil.executeWeirdMigrations( - executorService, - sqlGenerator, - destinationHandler, - v1V2Migrator, - v2TableMigrator, - parsedCatalog); - destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations( executorService, destinationHandler, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java deleted file mode 100644 index 1a31d04b9a70..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2Migrator.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -public interface DestinationV1V2Migrator { - - /** - * This is the primary entrypoint to this interface - *

- * Determine whether a migration is necessary for a given stream and if so, migrate the raw table - * and rebuild the final table with a soft reset - * - * @param sqlGenerator the class to use to generate sql - * @param destinationHandler the handler to execute the sql statements - * @param streamConfig the stream to assess migration needs - */ - void migrateIfNecessary( - final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, - final StreamConfig streamConfig) - throws TableNotMigratedException, UnexpectedSchemaException, Exception; - -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java deleted file mode 100644 index a32f214cec49..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpDestinationV1V2Migrator.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -public class NoOpDestinationV1V2Migrator implements DestinationV1V2Migrator { - - @Override - public void migrateIfNecessary(final SqlGenerator sqlGenerator, - final DestinationHandler destinationHandler, - final StreamConfig streamConfig) - throws TableNotMigratedException, UnexpectedSchemaException { - // Do nothing - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java index 8a0f9b8c6beb..0f6fb18a456e 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java @@ -28,8 +28,6 @@ @Slf4j public class NoOpTyperDeduperWithV1V2Migrations implements TyperDeduper { - private final DestinationV1V2Migrator v1V2Migrator; - private final V2TableMigrator v2TableMigrator; private final List> migrations; private final ExecutorService executorService; private final ParsedCatalog parsedCatalog; @@ -39,14 +37,10 @@ public class NoOpTyperDeduperWithV1V2Migrations destinationHandler, final ParsedCatalog parsedCatalog, - final DestinationV1V2Migrator v1V2Migrator, - final V2TableMigrator v2TableMigrator, final List> migrations) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; this.parsedCatalog = parsedCatalog; - this.v1V2Migrator = v1V2Migrator; - this.v2TableMigrator = v2TableMigrator; this.migrations = migrations; this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(), new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build()); @@ -56,14 +50,6 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator, public void prepareSchemasAndRawTables() throws Exception { TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog); - TyperDeduperUtil.executeWeirdMigrations( - executorService, - sqlGenerator, - destinationHandler, - v1V2Migrator, - v2TableMigrator, - parsedCatalog); - List> destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations( executorService, destinationHandler, diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java deleted file mode 100644 index f2f2a9c41497..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -public class NoopV2TableMigrator implements V2TableMigrator { - - @Override - public void migrateIfNecessary(final StreamConfig streamConfig) { - // do nothing - } - -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt index 203b92e1916d..96f3962c6dce 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -24,12 +24,6 @@ class TyperDeduperUtil { migrations: List>, initialStates: List> ): List> { - // TODO: Either the migrations run the soft reset and create v2 tables or the actual prepare tables. - // unify the logic - // with current state of raw tables & final tables. This is done first before gather initial state - // to avoid recreating - // final tables later again. - // Run migrations in lockstep. Some migrations may require us to refetch the initial state. // We want to be able to batch those calls together across streams. // If a migration runs on one stream, it's likely to also run on other streams. @@ -78,39 +72,6 @@ class TyperDeduperUtil { return currentStates } - /** - * The legacy-style migrations (V1V2Migrator, V2TableMigrator) need to run before we gather - * initial state, because they're dumb and weird. - * (specifically: SnowflakeV2TableMigrator inspects the final tables and triggers a soft reset - * directly within the migration). - * TODO: Migrate these migrations to the new migration system. - * This will also reduce the number of times we need to query DB metadata, since (a) we can rely - * on the gatherInitialState values, and (b) we can add a DestinationState field for these migrations. - * It also enables us to not trigger multiple soft resets in a single sync. - */ - @JvmStatic - fun executeWeirdMigrations( - executorService: ExecutorService, - sqlGenerator: SqlGenerator, - destinationHandler: DestinationHandler, - v1V2Migrator: DestinationV1V2Migrator, - v2TableMigrator: V2TableMigrator, - parsedCatalog: ParsedCatalog - ) { - val futures = parsedCatalog.streams.map { - CompletableFuture.supplyAsync( - { - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, it) - v2TableMigrator.migrateIfNecessary(it) - }, - executorService - ) - } - getResultsOrLogAndThrowFirst( - "The following exceptions were thrown attempting to run migrations:\n", - CompletableFutures.allOf(futures.toList()).toCompletableFuture().join()) - } - /** * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they * exist in the Destination Database. diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java deleted file mode 100644 index ecc2d4ddd74c..000000000000 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.base.destination.typing_deduping; - -/** - * Prefer {@link io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration} - * instead. - */ -public interface V2TableMigrator { - - void migrateIfNecessary(final StreamConfig streamConfig) throws Exception; - -} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java index 1a5444e0e09e..fde2c6b199b3 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduperTest.java @@ -79,7 +79,6 @@ public MockState withSoftReset(boolean needsSoftReset) { private List> initialStates; private Map updatedStates; - private DestinationV1V2Migrator migrator; private TyperDeduper typerDeduper; private final Migration MIGRATION_REQUIRING_SOFT_RESET = new Migration<>() { @@ -168,14 +167,12 @@ void setup() throws Exception { updatedStates.put(APPEND_STREAM_CONFIG.id(), new MockState(false, false, true)); updatedStates.put(DEDUPE_STREAM_CONFIG.id(), new MockState(false, false, true)); - migrator = new NoOpDestinationV1V2Migrator(); - parsedCatalog = new ParsedCatalog(List.of( OVERWRITE_STREAM_CONFIG, APPEND_STREAM_CONFIG, DEDUPE_STREAM_CONFIG)); - typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, Collections.emptyList()); + typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, Collections.emptyList()); } /** @@ -436,7 +433,6 @@ void multipleSoftResets() throws Exception { sqlGenerator, destinationHandler, parsedCatalog, - migrator, List.of(MIGRATION_REQUIRING_SOFT_RESET)); // Notably: isSchemaMismatch = true, @@ -512,7 +508,6 @@ void noopMigrations() throws Exception { sqlGenerator, destinationHandler, parsedCatalog, - migrator, List.of(MIGRATION_REQUIRING_SOFT_RESET, MIGRATION_NOOP)); when(destinationHandler.gatherInitialState(anyList())) @@ -574,7 +569,6 @@ void migrationsMixedResults() throws Exception { sqlGenerator, destinationHandler, parsedCatalog, - migrator, List.of(MIGRATION_REQUIRING_SOFT_RESET, MIGRATION_NOT_REQUIRING_SOFT_RESET)); when(destinationHandler.gatherInitialState(anyList())) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java index 2f582274438b..6dbedf3d3745 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/test/java/io/airbyte/integrations/base/destination/typing_deduping/DestinationV1V2MigratorTest.java @@ -64,9 +64,10 @@ public void testShouldMigrate(final DestinationSyncMode destinationSyncMode, fin public void testMismatchedSchemaThrowsException() throws Exception { final StreamConfig config = new StreamConfig(STREAM_ID, null, DestinationSyncMode.APPEND_DEDUP, null, null, null); final var migrator = makeMockMigrator(true, true, false, false, false); - final UnexpectedSchemaException exception = Assertions.assertThrows(UnexpectedSchemaException.class, + final RuntimeException exception = Assertions.assertThrows(RuntimeException.class, () -> migrator.shouldMigrate(config)); - Assertions.assertEquals("Destination V2 Raw Table does not match expected Schema", exception.getMessage()); + final UnexpectedSchemaException cause = (UnexpectedSchemaException) exception.getCause(); + Assertions.assertEquals("Destination V2 Raw Table does not match expected Schema", cause.getMessage()); } @SneakyThrows