Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Destination BigQuery - Add v1v2 Migration #28962

Merged
merged 70 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
f65f92b
Add everything for BQ but migrate, refactor interface after practical…
jbfbell Jul 24, 2023
3542aac
Make new default methods, refactor to single implemented method
jbfbell Jul 24, 2023
d34e755
MigrationInterface and BQ impl created
jbfbell Jul 25, 2023
ab19eaa
Trying to integrate with standard inserts
jbfbell Jul 26, 2023
e49ef2a
remove unnecessary NameAndNamespacePair class
jbfbell Jul 27, 2023
1edc723
Shimmed in
jbfbell Jul 27, 2023
347c647
Java Docs
jbfbell Jul 27, 2023
81ad6b9
Initial Testing Setup
jbfbell Jul 28, 2023
c525728
Tests!
jbfbell Jul 31, 2023
fdfc952
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Jul 31, 2023
864e99a
Move Migrator into TyperDeduper
jbfbell Aug 1, 2023
312c4af
Functional Migration
jbfbell Aug 1, 2023
41b6f40
Add Integration Test
jbfbell Aug 2, 2023
41057bf
Pr updates
jbfbell Aug 2, 2023
9a91dea
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
bcac78e
bump version
jbfbell Aug 2, 2023
6ac8c46
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
ea390b3
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
3c46067
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
96e44e2
bump version
jbfbell Aug 2, 2023
d4559dd
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
7cd01cd
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 2, 2023
36fd272
merge conflict
jbfbell Aug 3, 2023
ec0273a
version bump
jbfbell Aug 3, 2023
99846be
Update to airbyte-ci-internal (#29026)
bnchrch Aug 3, 2023
2485b0a
🐛 Source Github, Instagram, Zendesk-support, Zendesk-talk: fix CAT te…
bazarnov Aug 3, 2023
408f700
connectors-ci: better modified connectors detection logic (#28855)
alafanechere Aug 3, 2023
c390e20
connectors-ci: report path should always start with `airbyte-ci/` (#2…
alafanechere Aug 3, 2023
5cd576a
Updated docs (#29019)
lazebnyi Aug 3, 2023
b91acef
CDK: Embedded reader utils (#28873)
Aug 3, 2023
d34bec2
🤖 Bump minor version of Airbyte CDK
flash1293 Aug 3, 2023
b89fb72
🚨🚨 Low code CDK: Decouple SimpleRetriever and HttpStream (#28657)
Aug 3, 2023
31b176f
🤖 Bump minor version of Airbyte CDK
flash1293 Aug 3, 2023
0e945f1
🐛 Source Github, Instagram, Zendesk Support / Talk - revert `spec` ch…
bazarnov Aug 3, 2023
7aaafdd
Source oauth0: new streams and fix incremental (#29001)
marcosmarxm Aug 3, 2023
ef95c9b
🐛 Source Mongo: Fix failing acceptance tests (#28816)
jdpgrailsdev Aug 3, 2023
e76059f
Source-Greenhouse: Fix unit tests for new CDK version (#28969)
Aug 3, 2023
8e54152
Add CSV options to the CSV parser (#28491)
girarda Aug 3, 2023
5b403dd
Dagster: Add sentry logging (#28822)
bnchrch Aug 3, 2023
055d0f6
✨Source Shortio: Migrate Python CDK to Low-code CDK (#28950)
btkcodedev Aug 3, 2023
5a621c9
Update to new verbiage (#29051)
nataliekwong Aug 3, 2023
c94b8f8
[skip ci] Metadata: Remove leading underscore (#29024)
bnchrch Aug 3, 2023
a3d739a
Proof of concept parallel source stream reading implementation for My…
jdpgrailsdev Aug 3, 2023
49ade2d
connectors-ci: disable dependency scanning (#29033)
alafanechere Aug 3, 2023
0f3fbbd
updates (#29059)
Hesperide Aug 3, 2023
cc5b8b6
Metadata: skip breaking change validation on prerelease (#29017)
bnchrch Aug 3, 2023
d22ba7c
✨ Source MongoDB Internal POC: Generate Test Data (#29049)
jdpgrailsdev Aug 3, 2023
c8f06a8
Bump Airbyte version from 0.50.12 to 0.50.13
terencecho Aug 3, 2023
979472d
Bump versions for mssql strict-encrypt (#28964)
rodireich Aug 3, 2023
053430e
🎨 Improve replication method selection UX (#28882)
lmossman Aug 3, 2023
7ddaafd
🐛 Avoid writing records to log (#29047)
benmoriceau Aug 3, 2023
1b77e37
Rollout ctid cdc (#28708)
rodireich Aug 3, 2023
2690277
connectors-ci: fix `unhashable type 'set'` (#29064)
alafanechere Aug 3, 2023
4339713
Add Slack Alert lifecycle to Dagster for Metadata publish (#28759)
bnchrch Aug 3, 2023
1f1ba61
merge conflict
jbfbell Aug 5, 2023
285b820
PR Refactoring
jbfbell Aug 4, 2023
85f42a0
Make the tests work
jbfbell Aug 4, 2023
549989b
remove unnecessary classes, pr feedback
jbfbell Aug 5, 2023
3c38e43
a painful merge
jbfbell Aug 5, 2023
c8a2025
more merging
jbfbell Aug 5, 2023
45b62fb
Update airbyte-integrations/bases/base-typing-deduping-test/src/main/…
jbfbell Aug 7, 2023
68188ed
merge conflicts
jbfbell Aug 7, 2023
df2bb3f
snowflake updates
jbfbell Aug 7, 2023
05f03e8
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 7, 2023
c78ff46
bump snowflake version
jbfbell Aug 7, 2023
684dab5
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 8, 2023
91b2e19
fix test import
jbfbell Aug 8, 2023
4ca01fc
Merge branch 'master' into joseph.bell/28065/dest-v2-bq-migration
jbfbell Aug 8, 2023
0a705c9
Automated Commit - Format and Process Resources Changes
jbfbell Aug 8, 2023
82061fb
mergin
jbfbell Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand All @@ -21,6 +22,7 @@
import io.airbyte.protocol.models.v0.SyncMode;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -122,6 +124,11 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
*/
protected abstract void createRawTable(StreamId streamId) throws Exception;

/**
* Creates a raw table in the v1 format
*/
protected abstract void createV1RawTable(StreamId v1RawTable) throws Exception;

/**
* Create a final table usingi the StreamId's finalTableId. Subclasses are recommended to hardcode
* the columns from {@link #FINAL_TABLE_COLUMN_NAMES} or {@link #FINAL_TABLE_COLUMN_NAMES_CDC}. The
Expand All @@ -132,6 +139,8 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {

protected abstract void insertRawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;

protected abstract void insertV1RawTableRecords(StreamId streamId, List<JsonNode> records) throws Exception;

protected abstract void insertFinalTableRecords(boolean includeCdcDeletedAt, StreamId streamId, String suffix, List<JsonNode> records)
throws Exception;

Expand Down Expand Up @@ -709,6 +718,30 @@ public void weirdColumnNames() throws Exception {
dumpFinalTableRecords(streamId, ""));
}

@Test
public void testV1V2migration() throws Exception {
// This is maybe a little hacky, but it avoids having to refactor this entire class and subclasses
// for something that is going away
StreamId v1RawTableStreamId = new StreamId(null, null, streamId.finalNamespace(), "v1_" + streamId.rawName(), null, null);
createV1RawTable(v1RawTableStreamId);
insertV1RawTableRecords(v1RawTableStreamId, singletonList(Jsons.jsonNode(Map.of(
"_airbyte_ab_id", "v1v2",
"_airbyte_emitted_at", "2023-01-01T00:00:00Z",
"_airbyte_data", "{\"hello\": \"world\"}"))));
final String migration = generator.migrateFromV1toV2(streamId, v1RawTableStreamId.rawNamespace(), v1RawTableStreamId.rawName());
destinationHandler.execute(migration);
List<JsonNode> v1RawRecords = dumpRawTableRecords(v1RawTableStreamId);
List<JsonNode> v2RawRecords = dumpRawTableRecords(streamId);
assertAll(
() -> assertEquals(1, v1RawRecords.size()),
() -> assertEquals(1, v2RawRecords.size()),
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_ab_id").asText(), v2RawRecords.get(0).get("_airbyte_raw_id").asText()),
() -> assertEquals(Jsons.deserialize(v1RawRecords.get(0).get("_airbyte_data").asText()), v2RawRecords.get(0).get("_airbyte_data")),
() -> assertEquals(v1RawRecords.get(0).get("_airbyte_emitted_at").asText(), v2RawRecords.get(0).get("_airbyte_extracted_at").asText()),
() -> assertNull(v2RawRecords.get(0).get("_airbyte_loaded_at")));

}

private void verifyRecords(final String expectedRawRecordsFile,
final List<JsonNode> actualRawRecords,
final String expectedFinalRecordsFile,
Expand Down
3 changes: 2 additions & 1 deletion airbyte-integrations/bases/base-typing-deduping/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ plugins {
}

dependencies {
implementation libs.airbyte.protocol
implementation libs.airbyte.protocol
implementation project(path: ':airbyte-integrations:bases:base-java')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

import static io.airbyte.integrations.base.JavaBaseConstants.LEGACY_RAW_TABLE_COLUMNS;
import static io.airbyte.integrations.base.JavaBaseConstants.V2_RAW_TABLE_COLUMN_NAMES;

import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.util.Collection;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {

Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
migrate(sqlGenerator, destinationHandler, streamConfig);
}
}

/**
* Determine whether a given stream needs to be migrated from v1 to v2
*
* @param streamConfig the stream in question
* @return whether to migrate the stream
*/
protected boolean shouldMigrate(final StreamConfig streamConfig) {
final var v1RawTable = convertToV1RawName(streamConfig);
return isMigrationRequiredForSyncMode(streamConfig.destinationSyncMode())
&& !doesValidV2RawTableAlreadyExist(streamConfig)
&& doesValidV1RawTableExist(v1RawTable.namespace(), v1RawTable.tableName());
}

/**
* Execute sql statements that converts a v1 raw table to a v2 raw table. Leaves the v1 raw table
* intact
*
* @param sqlGenerator the class which generates dialect specific sql statements
* @param destinationHandler the class which executes the sql statements
* @param streamConfig the stream to migrate the raw table of
*/
public void migrate(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException {
final var namespacedTableName = convertToV1RawName(streamConfig);
final var migrateAndReset = String.join("\n",
sqlGenerator.migrateFromV1toV2(streamConfig.id(), namespacedTableName.namespace(),
namespacedTableName.tableName()),
sqlGenerator.softReset(streamConfig));
try {
destinationHandler.execute(migrateAndReset);
} catch (Exception e) {
final var message = "Attempted and failed to migrate stream %s".formatted(streamConfig.id().finalName());
throw new TableNotMigratedException(message, e);
}
}

/**
* Checks the schema of the v1 raw table to ensure it matches the expected format
*
* @param existingV2AirbyteRawTable the v1 raw table
* @return whether the schema is as expected
*/
private boolean doesV1RawTableMatchExpectedSchema(DialectTableDefinition existingV2AirbyteRawTable) {

return schemaMatchesExpectation(existingV2AirbyteRawTable, LEGACY_RAW_TABLE_COLUMNS);
}

/**
* Checks the schema of the v2 raw table to ensure it matches the expected format
*
* @param existingV2AirbyteRawTable the v2 raw table
*/
private void validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema(DialectTableDefinition existingV2AirbyteRawTable) {
if (!schemaMatchesExpectation(existingV2AirbyteRawTable, V2_RAW_TABLE_COLUMN_NAMES)) {
throw new UnexpectedSchemaException("Destination V2 Raw Table does not match expected Schema");
}
}

/**
* If the sync mode is a full refresh and we overwrite the table then there is no need to migrate
*
* @param destinationSyncMode destination sync mode
* @return whether this is full refresh overwrite
*/
private boolean isMigrationRequiredForSyncMode(final DestinationSyncMode destinationSyncMode) {
return !DestinationSyncMode.OVERWRITE.equals(destinationSyncMode);
}

/**
* Checks if a valid destinations v2 raw table already exists
*
* @param streamConfig the raw table to check
* @return whether it exists and is in the correct format
*/
private boolean doesValidV2RawTableAlreadyExist(final StreamConfig streamConfig) {
if (doesAirbyteInternalNamespaceExist(streamConfig)) {
final var existingV2Table = getTableIfExists(streamConfig.id().rawNamespace(), streamConfig.id().rawName());
existingV2Table.ifPresent(this::validateAirbyteInternalNamespaceRawTableMatchExpectedV2Schema);
return existingV2Table.isPresent();
}
return false;
}

/**
* Checks if a valid v1 raw table already exists
*
* @param namespace
* @param tableName
* @return whether it exists and is in the correct format
*/
private boolean doesValidV1RawTableExist(final String namespace, final String tableName) {
final var existingV1RawTable = getTableIfExists(namespace, tableName);
return existingV1RawTable.isPresent() && doesV1RawTableMatchExpectedSchema(existingV1RawTable.get());
}

/**
* Checks to see if Airbyte's internal schema for destinations v2 exists
*
* @param streamConfig the stream to check
* @return whether the schema exists
*/
abstract protected boolean doesAirbyteInternalNamespaceExist(StreamConfig streamConfig);

/**
* Checks a Table's schema and compares it to an expected schema to make sure it matches
*
* @param existingTable the table to check
* @param columns the expected schema
* @return whether the existing table schema matches the expectation
*/
abstract protected boolean schemaMatchesExpectation(DialectTableDefinition existingTable, Collection<String> columns);

/**
* Get a reference ta a table if it exists
*
* @param namespace
* @param tableName
* @return an optional potentially containing a reference to the table
*/
abstract protected Optional<DialectTableDefinition> getTableIfExists(String namespace, String tableName);

/**
* We use different naming conventions for raw table names in destinations v2, we need a way to map
* that back to v1 names
*
* @param streamConfig the stream in question
* @return the valid v1 name and namespace for the same stream
*/
abstract protected NamespacedTableName convertToV1RawName(StreamConfig streamConfig);

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public static boolean containsIgnoreCase(final Collection<String> collection, fi
* @return whether all searchTerms are in the searchCollection
*/
public static boolean containsAllIgnoreCase(final Collection<String> searchCollection, final Collection<String> searchTerms) {
if (searchTerms.isEmpty()) {
// There isn't a good behavior for an empty collection. Without this check, an empty collection
// would always return
// true, but it feels misleading to say that the searchCollection does "contain all" when
// searchTerms is empty
throw new IllegalArgumentException("Search Terms collection may not be empty");
}
return searchTerms.stream().allMatch(term -> containsIgnoreCase(searchCollection, term));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* <p>
* In a typical sync, destinations should call the methods:
* <ol>
* <li>{@link #prepareFinalTables()} once at the start of the sync</li>
* <li>{@link #prepareTables()} once at the start of the sync</li>
* <li>{@link #typeAndDedupe(String, String)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
Expand All @@ -35,15 +35,19 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper

private final SqlGenerator<DialectTableDefinition> sqlGenerator;
private final DestinationHandler<DialectTableDefinition> destinationHandler;

private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;

public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
DestinationHandler<DialectTableDefinition> destinationHandler,
ParsedCatalog parsedCatalog) {
ParsedCatalog parsedCatalog,
DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
}

/**
Expand All @@ -52,7 +56,7 @@ public DefaultTyperDeduper(SqlGenerator<DialectTableDefinition> sqlGenerator,
* empty) we write to a temporary final table, and swap it into the true final table at the end of
* the sync. This is to prevent user downtime during a sync.
*/
public void prepareFinalTables() throws Exception {
public void prepareTables() throws Exception {
if (overwriteStreamsWithTmpTable != null) {
throw new IllegalStateException("Tables were already prepared.");
}
Expand All @@ -63,6 +67,8 @@ public void prepareFinalTables() throws Exception {
// Also, for OVERWRITE streams, decide if we're writing directly to the final table, or into an
// _airbyte_tmp table.
for (StreamConfig stream : parsedCatalog.streams()) {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
if (existingTable.isPresent()) {
// The table already exists. Decide whether we're writing to it directly, or using a tmp table.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

public interface DestinationV1V2Migrator<DialectTableDefinition> {

/**
* This is the primary entrypoint to this interface
* <p>
* Determine whether a migration is necessary for a given stream and if so, migrate the raw table
* and rebuild the final table with a soft reset
*
* @param sqlGenerator the class to use to generate sql
* @param destinationHandler the handler to execute the sql statements
* @param streamConfig the stream to assess migration needs
*/
void migrateIfNecessary(
final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

// yet another namespace, name combo class
public record NamespacedTableName(String namespace, String tableName) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.destination.typing_deduping;

public class NoOpDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator<DialectTableDefinition> {

@Override
public void migrateIfNecessary(final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException {
// Do nothing
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
public class NoopTyperDeduper implements TyperDeduper {

@Override
public void prepareFinalTables() throws Exception {
public void prepareTables() throws Exception {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,15 @@ public interface SqlGenerator<DialectTableDefinition> {
*/
String overwriteFinalTable(StreamId stream, String finalSuffix);

/**
* Creates a sql query which will create a v2 raw table from the v1 raw table, then performs a soft
* reset.
*
* @param streamId the stream to migrate
* @param namespace
* @param tableName
* @return a string containing the necessary sql to migrate
*/
String migrateFromV1toV2(StreamId streamId, String namespace, String tableName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ public TableNotMigratedException(String message) {
super(message);
}

public TableNotMigratedException(String message, Throwable cause) {
super(message, cause);
}

}
Loading