Skip to content

Commit

Permalink
move weird migrators to new framework
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Feb 26, 2024
1 parent 951733a commit 61e34fc
Show file tree
Hide file tree
Showing 13 changed files with 87 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteRecordMessage;
import io.airbyte.commons.exceptions.ConnectionErrorException;
Expand All @@ -34,9 +33,10 @@
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.NoOpTyperDeduperWithV1V2Migrations;
import io.airbyte.integrations.base.destination.typing_deduping.NoopTyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.NoopV2TableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
Expand All @@ -54,7 +54,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJdbcDestination extends JdbcConnector implements Destination {
public abstract class AbstractJdbcDestination<DestinationState extends MinimumDestinationState>
extends JdbcConnector implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcDestination.class);

Expand Down Expand Up @@ -254,10 +255,20 @@ private void assertCustomParametersDontOverwriteDefaultParameters(final Map<Stri

protected abstract JdbcSqlGenerator getSqlGenerator();

protected abstract JdbcDestinationHandler<? extends MinimumDestinationState> getDestinationHandler(final String databaseName,
protected abstract JdbcDestinationHandler<DestinationState> getDestinationHandler(final String databaseName,
final JdbcDatabase database,
final String rawTableSchema);

/**
* Provide any migrations that the destination needs to run. Most destinations will need to provide an instande of
* {@link io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcV1V2Migrator} at minimum.
*/
protected abstract List<Migration<DestinationState>> getMigrations(
final JdbcDatabase database,
final String databaseName,
final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler);

/**
* "database" key at root of the config json, for any other variants in config, override this
* method.
Expand Down Expand Up @@ -319,17 +330,15 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<? extends MinimumDestinationState> destinationHandler =
final DestinationHandler<DestinationState> destinationHandler =
getDestinationHandler(databaseName, database, rawNamespaceOverride.orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE));
final List<Migration<DestinationState>> migrations = getMigrations(database, databaseName, sqlGenerator, destinationHandler);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, List.of());
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrations);
}
return typerDeduper;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import io.airbyte.commons.exceptions.SQLRuntimeException;
import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator;
import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
Expand All @@ -21,13 +23,18 @@
* Largely based on
* {@link io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator}.
*/
public class JdbcV1V2Migrator extends BaseDestinationV1V2Migrator<TableDefinition> {
public abstract class JdbcV1V2Migrator<DestinationState extends MinimumDestinationState>
extends BaseDestinationV1V2Migrator<TableDefinition, DestinationState> {

private final NamingConventionTransformer namingConventionTransformer;
private final JdbcDatabase database;
private final String databaseName;

public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer, final JdbcDatabase database, final String databaseName) {
public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransformer,
final SqlGenerator sqlGenerator,
final JdbcDatabase database,
final String databaseName) {
super(sqlGenerator);
this.namingConventionTransformer = namingConventionTransformer;
this.database = database;
this.databaseName = databaseName;
Expand Down Expand Up @@ -72,5 +79,4 @@ protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
tableName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -112,7 +118,7 @@ void testInvalidExtraParam() {
() -> new TestJdbcDestination().getConnectionProperties(buildConfigWithExtraJdbcParameters(extraParam)));
}

static class TestJdbcDestination extends AbstractJdbcDestination {
static class TestJdbcDestination extends AbstractJdbcDestination<MinimumDestinationState.Impl> {

private final Map<String, String> defaultProperties;

Expand All @@ -137,15 +143,22 @@ public JsonNode toJdbcConfig(final JsonNode config) {

@Override
protected JdbcSqlGenerator getSqlGenerator() {
// TODO do we need to populate this?
return null;
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
protected JdbcDestinationHandler<MinimumDestinationState.Impl> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
return null;
}

@Override
protected List<Migration<MinimumDestinationState.Impl>> getMigrations(JdbcDatabase database,
String databaseName,
SqlGenerator sqlGenerator,
DestinationHandler<MinimumDestinationState.Impl> destinationHandler) {
return Collections.emptyList();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,53 @@
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES;

import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.Collection;
import java.util.Optional;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition, DestinationState extends MinimumDestinationState>
implements Migration<DestinationState> {

protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

private final SqlGenerator sqlGenerator;

/**
* Should never be called. Exists so that we can mock this object, because Mockito
* requires a no-args constructor.
*/
protected BaseDestinationV1V2Migrator() {
this(null);
}

protected BaseDestinationV1V2Migrator(SqlGenerator sqlGenerator) {
this.sqlGenerator = sqlGenerator;
}

protected abstract DestinationState setV1V2MigrationDone(DestinationState state);

@NotNull
@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler<?> destinationHandler,
final StreamConfig streamConfig)
throws Exception {
public MigrationResult<DestinationState> migrateIfNecessary(
@NotNull DestinationHandler<DestinationState> destinationHandler,
@NotNull StreamConfig streamConfig,
@NotNull DestinationInitialState<DestinationState> state) {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
migrate(sqlGenerator, destinationHandler, streamConfig);
LOGGER.info("V2 Migration completed successfully for stream {}", streamConfig.id().finalName());
final DestinationState updatedState = setV1V2MigrationDone(state.destinationState());
// The v2 raw table now exists. We should refetch the initial state.
return new MigrationResult<>(updatedState, true);
} else {
LOGGER.info("No Migration Required for stream: {}", streamConfig.id().finalName());
return new MigrationResult<>(state.destinationState(), false);
}

}
Expand All @@ -40,12 +64,18 @@ public void migrateIfNecessary(
* @param streamConfig the stream in question
* @return whether to migrate the stream
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) throws Exception {
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
LOGGER.info("Checking whether v1 raw table {} in dataset {} exists", v1RawTable.tableName(), v1RawTable.namespace());
final var syncModeNeedsMigration = isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode());
final var noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
final var aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
final boolean noValidV2RawTableExists;
final boolean aValidV1RawTableExists;
try {
noValidV2RawTableExists = !doesValidV2RawTableAlreadyExist(streamConfig);
aValidV1RawTableExists = doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
} catch (Exception e) {
throw new RuntimeException(e);
}
LOGGER.info("Migration Info: Required for Sync mode: {}, No existing v2 raw tables: {}, A v1 raw table exists: {}",
syncModeNeedsMigration, noValidV2RawTableExists, aValidV1RawTableExists);
return syncModeNeedsMigration && noValidV2RawTableExists && aValidV1RawTableExists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat

private final SqlGenerator sqlGenerator;
private final DestinationHandler<DestinationState> destinationHandler;

private final DestinationV1V2Migrator v1V2Migrator;
private final V2TableMigrator v2TableMigrator;
private final List<Migration<DestinationState>> migrations;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
Expand All @@ -82,14 +79,10 @@ public class DefaultTyperDeduper<DestinationState extends MinimumDestinationStat
public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator v1V2Migrator,
final V2TableMigrator v2TableMigrator,
final List<Migration<DestinationState>> migrations) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2TableMigrator = v2TableMigrator;
this.migrations = migrations;
this.initialRawTableStateByStream = new ConcurrentHashMap<>();
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
Expand All @@ -99,30 +92,13 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerator,
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
}

public DefaultTyperDeduper(
final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator v1V2Migrator,
final List<Migration<DestinationState>> migrations) {
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), migrations);
}

@Override
public void prepareSchemasAndRawTables() throws Exception {
// Technically kind of weird to call this here, but it's the best place we have.
// Ideally, we'd create just airbyte_internal here, and defer creating the final table schemas
// until prepareFinalTables... but it doesn't really matter.
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);

TyperDeduperUtil.executeWeirdMigrations(
executorService,
sqlGenerator,
destinationHandler,
v1V2Migrator,
v2TableMigrator,
parsedCatalog);

destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations(
executorService,
destinationHandler,
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
@Slf4j
public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends MinimumDestinationState> implements TyperDeduper {

private final DestinationV1V2Migrator v1V2Migrator;
private final V2TableMigrator v2TableMigrator;
private final List<Migration<DestinationState>> migrations;
private final ExecutorService executorService;
private final ParsedCatalog parsedCatalog;
Expand All @@ -39,14 +37,10 @@ public class NoOpTyperDeduperWithV1V2Migrations<DestinationState extends Minimum
public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
final DestinationHandler<DestinationState> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator v1V2Migrator,
final V2TableMigrator v2TableMigrator,
final List<Migration<DestinationState>> migrations) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2TableMigrator = v2TableMigrator;
this.migrations = migrations;
this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(),
new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build());
Expand All @@ -56,14 +50,6 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator sqlGenerator,
public void prepareSchemasAndRawTables() throws Exception {
TyperDeduperUtil.prepareSchemas(sqlGenerator, destinationHandler, parsedCatalog);

TyperDeduperUtil.executeWeirdMigrations(
executorService,
sqlGenerator,
destinationHandler,
v1V2Migrator,
v2TableMigrator,
parsedCatalog);

List<DestinationInitialState<DestinationState>> destinationInitialStates = TyperDeduperUtil.executeRawTableMigrations(
executorService,
destinationHandler,
Expand Down

This file was deleted.

Loading

0 comments on commit 61e34fc

Please sign in to comment.