Skip to content

Commit

Permalink
✨ Destination BigQuery - Add v1v2 Migration (airbytehq#28962)
Browse files Browse the repository at this point in the history
* Add everything for BQ but migrate, refactor interface after practical work

* Make new default methods, refactor to single implemented method

* MigrationInterface and BQ impl created

* Trying to integrate with standard inserts

* remove unnecessary NameAndNamespacePair class

* Shimmed in

* Java Docs

* Initial Testing Setup

* Tests!

* Move Migrator into TyperDeduper

* Functional Migration

* Add Integration Test

* Pr updates

* bump version

* bump version

* version bump

* Update to airbyte-ci-internal (airbytehq#29026)

* 🐛 Source Github, Instagram, Zendesk-support, Zendesk-talk: fix CAT tests fail on `spec` (airbytehq#28910)

* connectors-ci: better modified connectors detection logic (airbytehq#28855)

* connectors-ci: report path should always start with `airbyte-ci/` (airbytehq#29030)

* make report path always start with airbyte-ci

* revert report path in orchestrator

* add more test cases

* bump version

* Updated docs (airbytehq#29019)

* CDK: Embedded reader utils (airbytehq#28873)

* relax pydantic dep

* Automated Commit - Format and Process Resources Changes

* wip

* wrap up base integration

* add init file

* introduce CDK runner and improve error message

* make state param optional

* update protocol models

* review comments

* always run incremental if possible

* fix

---------

Co-authored-by: flash1293 <flash1293@users.noreply.github.com>

* 🤖 Bump minor version of Airbyte CDK

* 🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream (airbytehq#28657)

* fix tests

* format

* review comments

* Automated Commit - Formatting Changes

* review comments

* review comments

* review comments

* log all messages

* log all message

* review comments

* review comments

* Automated Commit - Formatting Changes

* add comment

---------

Co-authored-by: flash1293 <flash1293@users.noreply.github.com>

* 🤖 Bump minor version of Airbyte CDK

* 🐛 Source Github, Instagram, Zendesk Support / Talk - revert `spec` changes and improve (airbytehq#29031)

* Source oauth0: new streams and fix incremental (airbytehq#29001)

* Add new streams Organizations,OrganizationMembers,OrganizationMemberRoles

* relax schema definition to allow additional fields

* Bump image tag version

* revert some changes to the old schemas

* Format python so gradle can pass

* update incremental

* remove unused print

* fix unit test

---------

Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com>

* 🐛 Source Mongo: Fix failing acceptance tests (airbytehq#28816)

* Fix failing acceptance tests

* Fix failing strict acceptance tests

* Source-Greenhouse: Fix unit tests for new CDK version (airbytehq#28969)

Fix unit tests

* Add CSV options to the CSV parser (airbytehq#28491)

* remove invalid legacy option

* remove unused option

* the tests pass but this is quite messy

* very slight clean up

* Add skip options to csv format

* fix some of the typing issues

* fixme comment

* remove extra log message

* fix typing issues

* skip before header

* skip after header

* format

* add another test

* Automated Commit - Formatting Changes

* auto generate column names

* delete dead code

* update title and description

* true and false values

* Update the tests

* Add comment

* missing test

* rename

* update expected spec

* move to method

* Update comment

* fix typo

* remove unused import

* Add a comment

* None records do not pass the WaitForDiscoverPolicy

* format

* remove second branch to ensure we always go through the same processing

* Raise an exception if the record is None

* reset

* Update tests

* handle unquoted newlines

* Automated Commit - Formatting Changes

* Update test case so the quoting is explicit

* Update comment

* Automated Commit - Formatting Changes

* Fail validation if skipping rows before header and header is autogenerated

* always fail if a record cannot be parsed

* format

* set write line_no in error message

* remove none check

* Automated Commit - Formatting Changes

* enable autogenerate test

* remove duplicate test

* missing unit tests

* Update

* remove branching

* remove unused none check

* Update tests

* remove branching

* format

* extract to function

* comment

* missing type

* type annotation

* use set

* Document that the strings are case-sensitive

* public -> private

* add unit test

* newline

---------

Co-authored-by: girarda <girarda@users.noreply.github.com>

* Dagster: Add sentry logging (airbytehq#28822)

* Add sentry

* add sentry decorator

* Add traces

* Use sentry trace

* Improve duplicate logging

* Add comments

* DNC

* Fix up issues

* Move to scopes

* Remove breadcrumb

* Update lock

* ✨Source Shortio: Migrate Python CDK to Low-code CDK (airbytehq#28950)

* Migrate Shortio to Low-Code

* Update abnormal state

* Format

* Update Docs

* Fix metadata.yaml

* Add pagination

* Add incremental sync

* add incremental parameters

* update metadata

* rollback update version

* release date

---------

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>

* Update to new verbiage (airbytehq#29051)

* [skip ci] Metadata: Remove leading underscore (airbytehq#29024)

* DNC

* Add test models

* Add model test

* Remove underscore from metadata files

* Regenerate models

* Add test to check for key transformation

* Allow additional fields on metadata

* Delete transform

* Proof of concept parallel source stream reading implementation for MySQL (airbytehq#26580)

* Proof of concept parallel source stream reading implementation for MySQL

* Automated Change

* Add read method that supports concurrent execution to Source interface

* Remove parallel iterator

* Ensure that executor service is stopped

* Automated Commit - Format and Process Resources Changes

* Expose method to fix compilation issue

* Use concurrent map to avoid access issues

* Automated Commit - Format and Process Resources Changes

* Ensure concurrent streams finish before closing source

* Fix compile issue

* Formatting

* Exclude concurrent stream threads from orphan thread watcher

* Automated Commit - Format and Process Resources Changes

* Refactor orphaned thread logic to account for concurrent execution

* PR feedback

* Implement readStreams in wrapper source

* Automated Commit - Format and Process Resources Changes

* Add readStream override

* Automated Commit - Format and Process Resources Changes

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* 🤖 Auto format source-mysql code [skip ci]

* Debug logging

* Reduce logging level

* Replace synchronized calls to System.out.println when concurrent

* Close consumer

* Flush before close

* Automated Commit - Format and Process Resources Changes

* Remove charset

* Use ASCII and flush periodically for parallel streams

* Test performance harness patch

* Automated Commit - Format and Process Resources Changes

* Cleanup

* Logging to identify concurrent read enabled

* Mark parameter as final

---------

Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: rodireich <rodireich@users.noreply.github.com>

* connectors-ci: disable dependency scanning (airbytehq#29033)

* updates (airbytehq#29059)

* Metadata: skip breaking change validation on prerelease (airbytehq#29017)

* skip breaking change validation

* Move ValidatorOpts higher in call

* Add prerelease test

* Fix test

* ✨ Source MongoDB Internal POC: Generate Test Data (airbytehq#29049)

* Add script to generate test data

* Fix prose

* Update credentials example

* PR feedback

* Bump Airbyte version from 0.50.12 to 0.50.13

* Bump versions for mssql strict-encrypt (airbytehq#28964)

* Bump versions for mssql strict-encrypt

* Fix failing test

* Fix failing test

* 🎨 Improve replication method selection UX (airbytehq#28882)

* update replication method in MySQL source

* bump version

* update expected specs

* update registries

* bump strict encrypt version

* make password always_show

* change url

* update registries

* 🐛 Avoid writing records to log (airbytehq#29047)

* Avoid writing records to log

* Update version

* Rollout ctid cdc (airbytehq#28708)

* source-postgres: enable ctid+cdc implementation

* 100% ctid rollout for cdc

* remove CtidFeatureFlags

* fix CdcPostgresSourceAcceptanceTest

* Bump versions and release notes

* Fix compilation error due to previous merge

---------

Co-authored-by: subodh <subodh1810@gmail.com>

* connectors-ci: fix `unhashable type 'set'` (airbytehq#29064)

* Add Slack Alert lifecycle to Dagster for Metadata publish (airbytehq#28759)

* DNC

* Add slack lifecycle logging

* Update to use slack

* Update slack to use resource and bot

* Improve markdown

* Improve log

* Add sensor logging

* Extend sensor time

* merge conflict

* PR Refactoring

* Make the tests work

* remove unnecessary classes, pr feedback

* more merging

* Update airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java

Co-authored-by: Edward Gao <edward.gao@airbyte.io>

* snowflake updates

---------

Co-authored-by: Ben Church <ben@airbyte.io>
Co-authored-by: Baz <oleksandr.bazarnov@globallogic.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Joe Reuter <joe@airbyte.io>
Co-authored-by: flash1293 <flash1293@users.noreply.github.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Vasilis Gavriilidis <vasilis.gavriilidis@orfium.com>
Co-authored-by: Jonathan Pearlin <jonathan@airbyte.io>
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
Co-authored-by: girarda <girarda@users.noreply.github.com>
Co-authored-by: btkcodedev <btk.codedev@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Natalie Kwong <38087517+nataliekwong@users.noreply.github.com>
Co-authored-by: jdpgrailsdev <jdpgrailsdev@users.noreply.github.com>
Co-authored-by: octavia-squidington-iii <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: rodireich <rodireich@users.noreply.github.com>
Co-authored-by: Alexandre Cuoci <Hesperide@users.noreply.github.com>
Co-authored-by: terencecho <terencecho@users.noreply.github.com>
Co-authored-by: Lake Mossman <lake@airbyte.io>
Co-authored-by: Benoit Moriceau <benoit@airbyte.io>
Co-authored-by: subodh <subodh1810@gmail.com>
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
  • Loading branch information
25 people authored and harrytou committed Sep 1, 2023
1 parent 6ed8c24 commit aaa79bf
Show file tree
Hide file tree
Showing 35 changed files with 862 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -21,6 +22,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -122,6 +124,11 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
*/
protected abstract void createRawTable(StreamId streamId) throws Exception;

/**
* Creates a raw table in the v1 format
*/
protected abstract void createV1RawTable(StreamId v1RawTable) throws Exception;

/**
* Create a final table usingi the StreamId's finalTableId. Subclasses are recommended to hardcode
* the columns from {@link #FINAL_TABLE_COLUMN_NAMES} or {@link #FINAL_TABLE_COLUMN_NAMES_CDC}. The
Expand All @@ -132,6 +139,8 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {

protected abstract void insertRawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;

protected abstract void insertV1RawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;

protected abstract void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List<JsonNode> records)
throws Exception;

Expand Down Expand Up @@ -709,6 +718,30 @@ public void weirdColumnNames() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

@Test
public void testV1V2migration() throws Exception {
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
// for something that is going away
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
createV1RawTable(v1RawTableStreamId);
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
"_airbyte_ab_id", "v1v2",
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
"_airbyte_data", "{\"hello\": \"world\"}"))));
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
destinationHandler.execute(migration);
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
assertAll(
() -> assertEquals(1, v1RawRecords.size()),
() -> assertEquals(1, v2RawRecords.size()),
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_ab_id").asText(), v2RawRecords.get(0).get("_airbyte_raw_id").asText()),
() -> assertEquals(Jsons.deserialize(v1RawRecords.get(0).get("_airbyte_data").asText()), v2RawRecords.get(0).get("_airbyte_data")),
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_emitted_at").asText(), v2RawRecords.get(0).get("_airbyte_extracted_at").asText()),
() -> assertNull(v2RawRecords.get(0).get("_airbyte_loaded_at")));

}

private void verifyRecords(final String expectedRawRecordsFile,
final List<JsonNode> actualRawRecords,
final String expectedFinalRecordsFile,
Expand Down
3 changes: 2 additions & 1 deletion airbyte-integrations/bases/base-typing-deduping/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ plugins {
}

dependencies {
implementation libs.airbyte.protocol
implementation libs.airbyte.protocol
implementation project(path: ':airbyte-integrations:bases:base-java')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import static io.airbyte.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
import static io.airbyte.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES;

import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.Collection;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {

Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
migrate(sqlGenerator, destinationHandler, streamConfig);
}
}

/**
* Determine whether a given stream needs to be migrated from v1 to v2
*
* @param streamConfig the stream in question
* @return whether to migrate the stream
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
&& !doesValidV2RawTableAlreadyExist(streamConfig)
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
}

/**
* Execute sql statements that converts a v1 raw table to a v2 raw table. Leaves the v1 raw table
* intact
*
* @param sqlGenerator the class which generates dialect specific sql statements
* @param destinationHandler the class which executes the sql statements
* @param streamConfig the stream to migrate the raw table of
*/
public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException {
final var namespacedTableName = convertToV1RawName(streamConfig);
final var migrateAndReset = String.join("\n",
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
namespacedTableName.tableName()),
sqlGenerator.softReset(streamConfig));
try {
destinationHandler.execute(migrateAndReset);
} catch (Exception e) {
final var message = "Attempted and failed to migrate stream %s".formatted(streamConfig.id().finalName());
throw new TableNotMigratedException(message, e);
}
}

/**
* Checks the schema of the v1 raw table to ensure it matches the expected format
*
* @param existingV2AirbyteRawTable the v1 raw table
* @return whether the schema is as expected
*/
private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existingV2AirbyteRawTable) {

return schemaMatchesExpectation(existingV2AirbyteRawTable, LEGACY_RAW_TABLE_COLUMNS);
}

/**
* Checks the schema of the v2 raw table to ensure it matches the expected format
*
* @param existingV2AirbyteRawTable the v2 raw table
*/
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(DialectTableDefinition existingV2AirbyteRawTable) {
if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES)) {
throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema");
}
}

/**
* If the sync mode is a full refresh and we overwrite the table then there is no need to migrate
*
* @param destinationSyncMode destination sync mode
* @return whether this is full refresh overwrite
*/
private boolean isMigrationRequiredForSyncMode(final DestinationSyncMode destinationSyncMode) {
return !DestinationSyncMode.OVERWRITE.equals(destinationSyncMode);
}

/**
* Checks if a valid destinations v2 raw table already exists
*
* @param streamConfig the raw table to check
* @return whether it exists and is in the correct format
*/
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) {
if (doesAirbyteInternalNamespaceExist(streamConfig)) {
final var existingV2Table = getTableIfExists(streamConfig.id().rawNamespace(), streamConfig.id().rawName());
existingV2Table.ifPresent(this::validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema);
return existingV2Table.isPresent();
}
return false;
}

/**
* Checks if a valid v1 raw table already exists
*
* @param namespace
* @param tableName
* @return whether it exists and is in the correct format
*/
private boolean doesValidV1RawTableExist(final String namespace, final String tableName) {
final var existingV1RawTable = getTableIfExists(namespace, tableName);
return existingV1RawTable.isPresent() && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get());
}

/**
* Checks to see if Airbyte's internal schema for destinations v2 exists
*
* @param streamConfig the stream to check
* @return whether the schema exists
*/
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig);

/**
* Checks a Table's schema and compares it to an expected schema to make sure it matches
*
* @param existingTable the table to check
* @param columns the expected schema
* @return whether the existing table schema matches the expectation
*/
abstract protected boolean schemaMatchesExpectation(DialectTableDefinition existingTable, Collection<String> columns);

/**
* Get a reference ta a table if it exists
*
* @param namespace
* @param tableName
* @return an optional potentially containing a reference to the table
*/
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName);

/**
* We use different naming conventions for raw table names in destinations v2, we need a way to map
* that back to v1 names
*
* @param streamConfig the stream in question
* @return the valid v1 name and namespace for the same stream
*/
abstract protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig);

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public static boolean containsIgnoreCase(final Collection<String> collection, fi
* @return whether all searchTerms are in the searchCollection
*/
public static boolean containsAllIgnoreCase(final Collection<String> searchCollection, final Collection<String> searchTerms) {
if (searchTerms.isEmpty()) {
// There isn't a good behavior for an empty collection. Without this check, an empty collection
// would always return
// true, but it feels misleading to say that the searchCollection does "contain all" when
// searchTerms is empty
throw new IllegalArgumentException("Search Terms collection may not be empty");
}
return searchTerms.stream().allMatch(term -> containsIgnoreCase(searchCollection, term));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* <p>
* In a typical sync, destinations should call the methods:
* <ol>
* <li>{@link #prepareFinalTables()} once at the start of the sync</li>
* <li>{@link #prepareTables()} once at the start of the sync</li>
* <li>{@link #typeAndDedupe(String, String)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
Expand All @@ -35,15 +35,19 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper

private final SqlGenerator<DialectTableDefinition> sqlGenerator;
private final DestinationHandler<DialectTableDefinition> destinationHandler;

private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;

public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
DestinationHandler<DialectTableDefinition> destinationHandler,
ParsedCatalog parsedCatalog) {
ParsedCatalog parsedCatalog,
DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
}

/**
Expand All @@ -52,7 +56,7 @@ public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
public void prepareFinalTables() throws Exception {
public void prepareTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
}
Expand All @@ -63,6 +67,8 @@ public void prepareFinalTables() throws Exception {
// Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an
// _airbyte_tmp table.
for (StreamConfig stream : parsedCatalog.streams()) {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
if (existingTable.isPresent()) {
// The table already exists. Decide whether we're writing to it directly, or using a tmp table.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

public interface DestinationV1V2Migrator<DialectTableDefinition> {

/**
* This is the primary entrypoint to this interface
* <p>
* Determine whether a migration is necessary for a given stream and if so, migrate the raw table
* and rebuild the final table with a soft reset
*
* @param sqlGenerator the class to use to generate sql
* @param destinationHandler the handler to execute the sql statements
* @param streamConfig the stream to assess migration needs
*/
void migrateIfNecessary(
final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

// yet another namespace, name combo class
public record NamespacedTableName(String namespace, String tableName) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

public class NoOpDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator<DialectTableDefinition> {

@Override
public void migrateIfNecessary(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
// Do nothing
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class NoopTyperDeduper implements TyperDeduper {

@Override
public void prepareFinalTables() throws Exception {
public void prepareTables() throws Exception {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,15 @@ public interface SqlGenerator<DialectTableDefinition> {
*/
String overwriteFinalTable(StreamId stream, String finalSuffix);

/**
* Creates a sql query which will create a v2 raw table from the v1 raw table, then performs a soft
* reset.
*
* @param streamId the stream to migrate
* @param namespace
* @param tableName
* @return a string containing the necessary sql to migrate
*/
String migrateFromV1toV2(StreamId streamId, String namespace, String tableName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ public TableNotMigratedException(String message) {
super(message);
}

public TableNotMigratedException(String message, Throwable cause) {
super(message, cause);
}

}
Loading

0 comments on commit aaa79bf

Please sign in to comment.