diff --git a/airbyte-cdk/java/airbyte-cdk/build.gradle b/airbyte-cdk/java/airbyte-cdk/build.gradle index f4abc2a918b9..089b72dbc5cd 100644 --- a/airbyte-cdk/java/airbyte-cdk/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/build.gradle @@ -1,13 +1,22 @@ +import org.jetbrains.kotlin.gradle.dsl.JvmTarget +import org.jetbrains.kotlin.gradle.dsl.KotlinVersion +plugins { + id 'org.jetbrains.kotlin.jvm' version '1.9.22' +} + final var cdkVersion = { var props = new Properties() file("core/src/main/resources/version.properties").withInputStream(props::load) return props.getProperty('version', 'undefined') }() + + allprojects { apply plugin: 'java-library' apply plugin: 'maven-publish' apply plugin: 'java-test-fixtures' + apply plugin: 'org.jetbrains.kotlin.jvm' group 'io.airbyte.cdk' @@ -44,6 +53,19 @@ allprojects { } } } + + compileKotlin { + compilerOptions { + jvmTarget = JvmTarget.JVM_21 + languageVersion = KotlinVersion.KOTLIN_1_9 + } + } + compileTestKotlin { + compilerOptions { + jvmTarget = JvmTarget.JVM_21 + languageVersion = KotlinVersion.KOTLIN_1_9 + } + } } project.configurations { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java index 826195d3fbcc..f0873bb05edb 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/base/ssh/SshWrappedDestination.java @@ -4,7 +4,13 @@ package io.airbyte.cdk.integrations.base.ssh; -import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.*; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.CONNECTION_OPTIONS_KEY; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_KEY; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_KEY; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.getInstance; +import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.sshWrap; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/StandardNameTransformer.java b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/StandardNameTransformer.java index a0bb39cc5d25..cc9c2dc4cd15 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/StandardNameTransformer.java +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination/StandardNameTransformer.java @@ -31,6 +31,7 @@ public String getNamespace(final String namespace) { } @Override + // @Deprecated see https://github.com/airbytehq/airbyte/issues/35333 public String getRawTableName(final String streamName) { return convertStreamName("_airbyte_raw_" + streamName); } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt new file mode 100644 index 000000000000..42183f51fcbe --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/util/ConfiguredCatalogUtil.kt @@ -0,0 +1,20 @@ +package io.airbyte.cdk.integrations.util + +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog + +/** + * For streams in [catalog] which do not have a namespace specified, explicitly set their namespace + * to the [defaultNamespace] + */ + fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) { + if (defaultNamespace == null) { + return + } + // TODO: This logic exists in all V2 destinations. + // This is sad that if we forget to add this, there will be a null pointer during parseCatalog + for (catalogStream in catalog.streams) { + if (catalogStream.stream.namespace.isNullOrEmpty()) { + catalogStream.stream.namespace = defaultNamespace + } + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 955936a54212..1df5d31312c9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.21.4 +version=0.22.1 diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java index d25b6ecb4296..f43e1abdea05 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -5,6 +5,7 @@ package io.airbyte.cdk.integrations.destination.jdbc; import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage; +import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; @@ -40,7 +41,6 @@ import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.v0.AirbyteMessage; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; -import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -49,7 +49,6 @@ import java.util.function.Consumer; import javax.sql.DataSource; import org.apache.commons.lang3.NotImplementedException; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +71,10 @@ protected SqlOperations getSqlOperations() { return sqlOperations; } + protected String getConfigSchemaKey() { + return "schema"; + } + public AbstractJdbcDestination(final String driverClass, final NamingConventionTransformer namingResolver, final SqlOperations sqlOperations) { @@ -276,44 +279,16 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) throws Exception { - final DataSource dataSource = getDataSource(config); - final JdbcDatabase database = getDatabase(dataSource); + final JdbcDatabase database = getDatabase(getDataSource(config)); + final String defaultNamespace; + final TyperDeduper typerDeduper; if (TypingAndDedupingFlag.isDestinationV2()) { - // TODO: This logic exists in all V2 destinations. - // This is sad that if we forget to add this, there will be a null pointer during parseCatalog - final String defaultNamespace = config.get("schema").asText(); - for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - if (StringUtils.isEmpty(stream.getStream().getNamespace())) { - stream.getStream().setNamespace(defaultNamespace); - } - } - final JdbcSqlGenerator sqlGenerator = getSqlGenerator(); - final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE) - .map(override -> new CatalogParser(sqlGenerator, override)) - .orElse(new CatalogParser(sqlGenerator)) - .parseCatalog(catalog); - final String databaseName = getDatabaseName(config); - final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName); - final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); - final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database); - final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); - final TyperDeduper typerDeduper; - if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, - 8); - } else { - typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, 8); - } - return JdbcBufferedConsumerFactory.createAsync( - outputRecordCollector, - database, - sqlOperations, - namingResolver, - config, - catalog, - defaultNamespace, - typerDeduper); + defaultNamespace = config.get(getConfigSchemaKey()).asText(); + addDefaultNamespaceToStreams(catalog, defaultNamespace); + typerDeduper = getV2TyperDeduper(config, catalog, database); + } else { + defaultNamespace = null; + typerDeduper = new NoopTyperDeduper(); } return JdbcBufferedConsumerFactory.createAsync( outputRecordCollector, @@ -322,8 +297,37 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN namingResolver, config, catalog, - null, - new NoopTyperDeduper()); + defaultNamespace, + typerDeduper); + } + + /** + * Creates the appropriate TyperDeduper class for the jdbc destination and the user's configuration + * + * @param config the configuration for the connection + * @param catalog the catalog for the connection + * @param database a database instance + * @return the appropriate TyperDeduper instance for this connection. + */ + private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) { + final JdbcSqlGenerator sqlGenerator = getSqlGenerator(); + final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE) + .map(override -> new CatalogParser(sqlGenerator, override)) + .orElse(new CatalogParser(sqlGenerator)) + .parseCatalog(catalog); + final String databaseName = getDatabaseName(config); + final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName); + final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator(); + final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database); + final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); + final TyperDeduper typerDeduper; + if (disableTypeDedupe) { + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); + } else { + typerDeduper = + new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator); + } + return typerDeduper; } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt new file mode 100644 index 000000000000..17447dbf0833 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/java/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/RawOnlySqlGenerator.kt @@ -0,0 +1,66 @@ +package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping + +import io.airbyte.cdk.integrations.destination.NamingConventionTransformer +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig +import org.jooq.Condition +import org.jooq.DataType +import org.jooq.Field +import org.jooq.SQLDialect +import java.util.* + +/** + * Some Destinations do not support Typing and Deduping but have the updated raw table format + * SqlGenerator implementations are only for "final" tables and are a required input for + * TyperDeduper classes. This implementation appeases that requirement but does not implement + * any "final" table operations. + */ +class RawOnlySqlGenerator(private val namingTransformer: NamingConventionTransformer) : + JdbcSqlGenerator(namingTransformer) { + override fun getStructType(): DataType<*>? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun getArrayType(): DataType<*>? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun getWidestType(): DataType<*>? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun getDialect(): SQLDialect? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun extractRawDataFields( + columns: LinkedHashMap, + useExpensiveSaferCasting: Boolean + ): List>? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun buildAirbyteMetaColumn(columns: LinkedHashMap): Field<*>? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun cdcDeletedAtNotNullCondition(): Condition? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun getRowNumber( + primaryKey: List, + cursorField: Optional + ): Field? { + throw NotImplementedError("This Destination does not support final tables") + } + + override fun existingSchemaMatchesStreamConfig( + stream: StreamConfig, + existingTable: TableDefinition + ): Boolean { + throw NotImplementedError("This Destination does not support final tables") + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java index 72231b4b40cf..8094c8fc214c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/java/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.java @@ -789,7 +789,7 @@ public void testIncrementalDedupeSync() throws Exception { .map(record -> Jsons.deserialize(record, AirbyteMessage.class)) .collect(Collectors.toList()); final JsonNode config = getConfig(); - runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, true); + runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, supportsNormalization()); final List secondSyncMessages = Lists.newArrayList( new AirbyteMessage() @@ -820,7 +820,7 @@ public void testIncrementalDedupeSync() throws Exception { .withType(Type.STATE) .withState(new AirbyteStateMessage().withData( Jsons.jsonNode(ImmutableMap.of("checkpoint", 2))))); - runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, true); + runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, false); final List expectedMessagesAfterSecondSync = new ArrayList<>(); expectedMessagesAfterSecondSync.addAll(firstSyncMessages); @@ -853,22 +853,11 @@ public void testIncrementalDedupeSync() throws Exception { final String defaultSchema = getDefaultSchema(config); retrieveRawRecordsAndAssertSameMessages(catalog, expectedMessagesAfterSecondSync, defaultSchema); - final List actualMessages = retrieveNormalizedRecords(catalog, - defaultSchema); - assertSameMessages(expectedMessages, actualMessages, true); - } - - private String generateBigString(final int addExtraCharacters) { - final int length = getMaxRecordValueLimit() + addExtraCharacters; - return RANDOM - .ints('a', 'z' + 1) - .limit(length) - .collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append) - .toString(); - } - - protected int getGenerateBigStringAddExtraCharacters() { - return 0; + if (normalizationFromDefinition()) { + final List actualMessages = retrieveNormalizedRecords(catalog, + defaultSchema); + assertSameMessages(expectedMessages, actualMessages, true); + } } /** @@ -1347,7 +1336,7 @@ private List runSync( destination.close(); - if (!runNormalization || (runNormalization && supportsInDestinationNormalization())) { + if (!runNormalization || (supportsInDestinationNormalization())) { return destinationOutput; } @@ -1860,6 +1849,10 @@ public Stream provideArguments(final ExtensionContext conte } + private boolean supportsNormalization() { + return supportsInDestinationNormalization() || normalizationFromDefinition(); + } + private static V0 convertProtocolObject(final V1 v1, final Class klass) { return Jsons.object(Jsons.jsonNode(v1), klass); } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index d01f47060ba4..764f888c4e16 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -5,17 +5,16 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME; -import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads; +import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions; +import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas; import static java.util.Collections.singleton; -import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.destination.StreamSyncSummary; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -79,8 +78,7 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerat final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, final DestinationV1V2Migrator v1V2Migrator, - final V2TableMigrator v2TableMigrator, - final int defaultThreadCount) { + final V2TableMigrator v2TableMigrator) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; this.parsedCatalog = parsedCatalog; @@ -90,7 +88,7 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerat this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size()); this.tdLocks = new ConcurrentHashMap<>(); this.internalTdLocks = new ConcurrentHashMap<>(); - this.executorService = Executors.newFixedThreadPool(countOfTypingDedupingThreads(defaultThreadCount), + this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(), new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build()); } @@ -100,18 +98,11 @@ public DefaultTyperDeduper( final ParsedCatalog parsedCatalog, final DestinationV1V2Migrator v1V2Migrator, final int defaultThreadCount) { - this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), defaultThreadCount); + this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator()); } private void prepareSchemas(final ParsedCatalog parsedCatalog) throws Exception { - final var rawSchema = parsedCatalog.streams().stream().map(stream -> stream.id().rawNamespace()); - final var finalSchema = parsedCatalog.streams().stream().map(stream -> stream.id().finalNamespace()); - final var createAllSchemasSql = Streams.concat(rawSchema, finalSchema) - .filter(Objects::nonNull) - .distinct() - .map(sqlGenerator::createSchema) - .toList(); - destinationHandler.execute(Sql.concat(createAllSchemasSql)); + prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler); } @Override diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java index 349437e4acec..3319af8297a0 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.java @@ -12,16 +12,18 @@ public class FutureUtils { + private static final int DEFAULT_TD_THREAD_COUNT = 8; + /** - * Allow for configuring the number of typing and deduping threads via an enviornment variable in + * Allow for configuring the number of typing and deduping threads via an environment variable in * the destination container. * * @return the number of threads to use in the typing and deduping pool */ - public static int countOfTypingDedupingThreads(final int defaultThreads) { + public static int getCountOfTypeAndDedupeThreads() { return Optional.ofNullable(System.getenv("TD_THREADS")) .map(Integer::valueOf) - .orElse(defaultThreads); + .orElse(DEFAULT_TD_THREAD_COUNT); } /** diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpRawTableTDLock.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpRawTableTDLock.kt new file mode 100644 index 000000000000..9c26e4d605b8 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpRawTableTDLock.kt @@ -0,0 +1,22 @@ +package io.airbyte.integrations.base.destination.typing_deduping + +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.Condition +import java.util.concurrent.locks.Lock + +class NoOpRawTableTDLock: Lock { + override fun lock() {} + + override fun lockInterruptibly() {} + + override fun tryLock() = true + + override fun tryLock(time: Long, unit: TimeUnit) = tryLock() + + override fun unlock() {} + + override fun newCondition(): Condition { + // Always throw exception to avoid callers from using this path + throw UnsupportedOperationException("This lock implementation does not support retrieving a Condition") + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java index f35d1a92356d..1fb3faf59def 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoOpTyperDeduperWithV1V2Migrations.java @@ -5,24 +5,23 @@ package io.airbyte.integrations.base.destination.typing_deduping; import static io.airbyte.cdk.integrations.base.IntegrationRunner.TYPE_AND_DEDUPE_THREAD_NAME; -import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.countOfTypingDedupingThreads; +import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.getCountOfTypeAndDedupeThreads; import static io.airbyte.integrations.base.destination.typing_deduping.FutureUtils.reduceExceptions; +import static io.airbyte.integrations.base.destination.typing_deduping.TyperDeduperUtilKt.prepareAllSchemas; -import com.google.common.collect.Streams; import io.airbyte.cdk.integrations.destination.StreamSyncSummary; import io.airbyte.protocol.models.v0.StreamDescriptor; import java.util.HashSet; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import kotlin.NotImplementedError; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.concurrent.BasicThreadFactory; /** @@ -44,49 +43,42 @@ public NoOpTyperDeduperWithV1V2Migrations(final SqlGenerator destinationHandler, final ParsedCatalog parsedCatalog, final DestinationV1V2Migrator v1V2Migrator, - final V2TableMigrator v2TableMigrator, - final int defaultThreadCount) { + final V2TableMigrator v2TableMigrator) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; this.parsedCatalog = parsedCatalog; this.v1V2Migrator = v1V2Migrator; this.v2TableMigrator = v2TableMigrator; - this.executorService = Executors.newFixedThreadPool(countOfTypingDedupingThreads(defaultThreadCount), + this.executorService = Executors.newFixedThreadPool(getCountOfTypeAndDedupeThreads(), new BasicThreadFactory.Builder().namingPattern(TYPE_AND_DEDUPE_THREAD_NAME).build()); } - private void prepareSchemas(final ParsedCatalog parsedCatalog) throws Exception { - final var rawSchema = parsedCatalog.streams().stream().map(stream -> stream.id().rawNamespace()); - final var finalSchema = parsedCatalog.streams().stream().map(stream -> stream.id().finalNamespace()); - final var createAllSchemasSql = Streams.concat(rawSchema, finalSchema) - .filter(Objects::nonNull) - .distinct() - .map(sqlGenerator::createSchema) - .toList(); - destinationHandler.execute(Sql.concat(createAllSchemasSql)); - } - @Override public void prepareTables() throws Exception { - log.info("ensuring schemas exist for prepareTables with V1V2 migrations"); - prepareSchemas(parsedCatalog); - final Set>> prepareTablesTasks = new HashSet<>(); - for (final StreamConfig stream : parsedCatalog.streams()) { - prepareTablesTasks.add(CompletableFuture.supplyAsync(() -> { - // Migrate the Raw Tables if this is the first v2 sync after a v1 sync - try { - log.info("Migrating V1->V2 for stream {}", stream.id()); - v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream); - log.info("Migrating V2 legacy for stream {}", stream.id()); - v2TableMigrator.migrateIfNecessary(stream); - return Optional.empty(); - } catch (final Exception e) { - return Optional.of(e); - } - }, executorService)); + try { + log.info("Ensuring schemas exist for prepareTables with V1V2 migrations"); + prepareAllSchemas(parsedCatalog, sqlGenerator, destinationHandler); + final Set>> prepareTablesTasks = new HashSet<>(); + for (final StreamConfig stream : parsedCatalog.streams()) { + prepareTablesTasks.add(CompletableFuture.supplyAsync(() -> { + // Migrate the Raw Tables if this is the first v2 sync after a v1 sync + try { + log.info("Migrating V1->V2 for stream {}", stream.id()); + v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream); + log.info("Migrating V2 legacy for stream {}", stream.id()); + v2TableMigrator.migrateIfNecessary(stream); + return Optional.empty(); + } catch (final Exception e) { + return Optional.of(e); + } + }, executorService)); + } + CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join(); + reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n"); + } catch (NotImplementedError | NotImplementedException e) { + log.warn( + "Could not prepare schemas or tables because this is not implemented for this destination, this should not be required for this destination to succeed"); } - CompletableFuture.allOf(prepareTablesTasks.toArray(CompletableFuture[]::new)).join(); - reduceExceptions(prepareTablesTasks, "The following exceptions were thrown attempting to prepare tables:\n"); } @Override @@ -96,42 +88,7 @@ public void typeAndDedupe(final String originalNamespace, final String originalN @Override public Lock getRawTableInsertLock(final String originalNamespace, final String originalName) { - return new Lock() { - - @Override - public void lock() { - - } - - @Override - public void lockInterruptibly() { - - } - - @Override - public boolean tryLock() { - // To mimic NoOp behavior always return true that lock is acquired - return true; - } - - @Override - public boolean tryLock(final long time, final TimeUnit unit) { - // To mimic NoOp behavior always return true that lock is acquired - return true; - } - - @Override - public void unlock() { - - } - - @Override - public Condition newCondition() { - // Always throw exception to avoid callers from using this path - throw new UnsupportedOperationException("This lock implementation does not support retrieving a Condition"); - } - - }; + return new NoOpRawTableTDLock(); } @Override diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt new file mode 100644 index 000000000000..c8020d561826 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/TyperDeduperUtil.kt @@ -0,0 +1,15 @@ +package io.airbyte.integrations.base.destination.typing_deduping + + +/** + * Extracts all the "raw" and "final" schemas identified in the [parsedCatalog] and ensures they + * exist in the Destination Database. + */ +fun prepareAllSchemas(parsedCatalog: ParsedCatalog, sqlGenerator: SqlGenerator, destinationHandler: DestinationHandler) { + val rawSchema = parsedCatalog.streams.mapNotNull { it.id.rawNamespace } + val finalSchema = parsedCatalog.streams.mapNotNull { it.id.finalNamespace } + val createAllSchemasSql = rawSchema.union(finalSchema) + .map { sqlGenerator.createSchema(it) } + .toList() + destinationHandler.execute(Sql.concat(createAllSchemasSql)) +} diff --git a/airbyte-integrations/bases/base-normalization/build.gradle b/airbyte-integrations/bases/base-normalization/build.gradle index 0be8b3f954d4..13f2dd53c9f9 100644 --- a/airbyte-integrations/bases/base-normalization/build.gradle +++ b/airbyte-integrations/bases/base-normalization/build.gradle @@ -1,5 +1,3 @@ -import org.apache.tools.ant.taskdefs.condition.Os - plugins { id 'airbyte-docker-legacy' id 'airbyte-python' @@ -38,6 +36,10 @@ tasks.named('check').configure { dependsOn generate } +tasks.named("jar").configure { + dependsOn copySshScript +} + [ 'bigquery', 'mysql', diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle index d1a316d740a4..584fd3a0dc72 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/build.gradle @@ -4,8 +4,8 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' - features = ['db-destinations'] + cdkVersionRequired = '0.22.1' + features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/metadata.yaml b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/metadata.yaml index b58fc5f5d3e5..c5023258510c 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/metadata.yaml +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/metadata.yaml @@ -7,16 +7,22 @@ data: connectorSubtype: database connectorType: destination definitionId: ce0d828e-1dc4-496c-b122-2da42e637e48 - dockerImageTag: 0.2.5 + dockerImageTag: 1.0.0 dockerRepository: airbyte/destination-clickhouse-strict-encrypt githubIssueLabel: destination-clickhouse icon: clickhouse.svg license: MIT name: Clickhouse - normalizationConfig: - normalizationIntegrationType: clickhouse - normalizationRepository: airbyte/normalization-clickhouse - normalizationTag: 0.4.1 + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with clickhouse. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. releaseStage: alpha documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse supportsDbt: false diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncrypt.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncrypt.java index 98d998a9e3ef..4efc4db3545c 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncrypt.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncrypt.java @@ -36,4 +36,9 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed destination: {}", ClickhouseDestinationStrictEncrypt.class); } + @Override + public boolean isV2Destination() { + return true; + } + } diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java index 6769060d4ff1..991ef0e2cde4 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java @@ -21,6 +21,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.cdk.integrations.util.HostPortResolver; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.sql.SQLException; import java.time.Duration; import java.util.ArrayList; @@ -139,7 +140,7 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, final String namespace, final JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal") .stream() .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -147,7 +148,9 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + final var nameTransformer = new StandardNameTransformer(); + final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, nameTransformer.convertStreamName(tableName), + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT); return jdbcDB.queryJsons(query); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/build.gradle b/airbyte-integrations/connectors/destination-clickhouse/build.gradle index 0386841d5f45..25cb081d4263 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/build.gradle +++ b/airbyte-integrations/connectors/destination-clickhouse/build.gradle @@ -4,8 +4,8 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.2.0' - features = ['db-destinations'] + cdkVersionRequired = '0.22.1' + features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml b/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml index cf10ca7aa667..b6cc2b944a79 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml +++ b/airbyte-integrations/connectors/destination-clickhouse/metadata.yaml @@ -2,22 +2,28 @@ data: connectorSubtype: database connectorType: destination definitionId: ce0d828e-1dc4-496c-b122-2da42e637e48 - dockerImageTag: 0.2.5 + dockerImageTag: 1.0.0 dockerRepository: airbyte/destination-clickhouse githubIssueLabel: destination-clickhouse icon: clickhouse.svg license: MIT name: Clickhouse - normalizationConfig: - normalizationIntegrationType: clickhouse - normalizationRepository: airbyte/normalization-clickhouse - normalizationTag: 0.4.3 registries: cloud: dockerRepository: airbyte/destination-clickhouse-strict-encrypt enabled: true oss: enabled: true + releases: + breakingChanges: + 1.0.0: + upgradeDeadline: "2024-03-15" + message: > + This version removes the option to use "normalization" with clickhouse. It also changes + the schema and database of Airbyte's "raw" tables to be compatible with the new + [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) + format. These changes will likely require updates to downstream dbt / SQL models. + Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync. releaseStage: alpha documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse supportsDbt: false diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java index 45a9d7cc8f08..77b3347944ca 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java @@ -16,6 +16,8 @@ import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status; @@ -87,7 +89,7 @@ public AirbyteConnectionStatus check(final JsonNode config) { final JdbcDatabase database = getDatabase(dataSource); final NamingConventionTransformer namingResolver = getNamingResolver(); final String outputSchema = namingResolver.getIdentifier(config.get(JdbcUtils.DATABASE_KEY).asText()); - attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, getSqlOperations()); + attemptTableOperations(outputSchema, database, namingResolver, getSqlOperations(), false); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); @@ -115,4 +117,19 @@ public static void main(final String[] args) throws Exception { LOGGER.info("completed destination: {}", ClickhouseDestination.class); } + @Override + protected JdbcSqlGenerator getSqlGenerator() { + return new RawOnlySqlGenerator(new ClickhouseSQLNameTransformer()); + } + + @Override + public boolean isV2Destination() { + return true; + } + + @Override + protected String getConfigSchemaKey() { + return "database"; + } + } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSqlOperations.java b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSqlOperations.java index 76d2fa56af89..0d0acf62d5ee 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSqlOperations.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSqlOperations.java @@ -10,7 +10,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations; -import io.airbyte.protocol.models.v0.AirbyteRecordMessage; +import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage; import java.io.File; import java.io.IOException; import java.nio.file.Files; @@ -36,18 +36,22 @@ public boolean isSchemaRequired() { @Override public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) { return String.format( - "CREATE TABLE IF NOT EXISTS %s.%s ( \n" - + "%s String,\n" - + "%s String,\n" - + "%s DateTime64(3, 'GMT') DEFAULT now(),\n" - + "PRIMARY KEY(%s)\n" - + ")\n" - + "ENGINE = MergeTree;\n", + """ + CREATE TABLE IF NOT EXISTS `%s`.`%s` ( + %s String, + %s String, + %s DateTime64(3, 'GMT') DEFAULT now(), + %s DateTime64(3, 'GMT') NULL, + PRIMARY KEY(%s) + ) + ENGINE = MergeTree; + """, schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_AB_ID, + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT, - JavaBaseConstants.COLUMN_NAME_AB_ID); + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID); } @Override @@ -60,7 +64,7 @@ public void executeTransaction(final JdbcDatabase database, final List q @Override public void insertRecordsInternal(final JdbcDatabase database, - final List records, + final List records, final String schemaName, final String tmpTableName) throws SQLException { @@ -102,4 +106,13 @@ public void insertRecordsInternal(final JdbcDatabase database, }); } + @Override + protected void insertRecordsInternalV2(final JdbcDatabase database, + final List records, + final String schemaName, + final String tableName) + throws Exception { + insertRecordsInternal(database, records, schemaName, tableName); + } + } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json index 4f3c51333f8a..a86a89f7f746 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-clickhouse/src/main/resources/spec.json @@ -3,7 +3,7 @@ "supportsIncremental": true, "supportsNormalization": true, "supportsDBT": false, - "supported_destination_sync_modes": ["overwrite", "append", "append_dedup"], + "supported_destination_sync_modes": ["overwrite", "append"], "connectionSpecification": { "$schema": "http://json-schema.org/draft-07/schema#", "title": "ClickHouse Destination Spec", @@ -58,6 +58,12 @@ "type": "boolean", "default": false, "order": 6 + }, + "raw_data_schema": { + "type": "string", + "description": "The schema to write raw tables into (default: airbyte_internal)", + "title": "Raw Table Schema Name", + "order": 7 } } } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java index 5f5c3ae948fa..c7e7d7a5b6a6 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java @@ -21,6 +21,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.cdk.integrations.util.HostPortResolver; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.sql.SQLException; import java.time.Duration; import java.util.HashSet; @@ -111,7 +112,7 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, final String namespace, final JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal") .stream() .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -119,7 +120,9 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + final var nameTransformer = new StandardNameTransformer(); + final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, nameTransformer.convertStreamName(tableName), + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT); return jdbcDB.queryJsons(query); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java index c82dfca207c1..163c9a6e36c6 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java @@ -19,6 +19,7 @@ import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider; import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -85,7 +86,7 @@ protected String getDefaultSchema(final JsonNode config) { @Override protected JsonNode getConfig() throws Exception { return bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db, DB_NAME) - .put("schema", DB_NAME), false); + .put("schema", DB_NAME), true); } @Override @@ -109,7 +110,7 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, final String namespace, final JsonNode streamSchema) throws Exception { - return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) + return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal") .stream() .map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText())) .collect(Collectors.toList()); @@ -122,7 +123,8 @@ private List retrieveRecordsFromTable(final String tableName, final St JdbcUtils.PORT_LIST_KEY, mangledConfig -> { final JdbcDatabase database = getDatabase(mangledConfig); - final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, namingResolver.convertStreamName(tableName), + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT); return database.queryJsons(query); }); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationTest.java index 0b05cb932a8e..e414e428e63d 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationTest.java @@ -13,11 +13,13 @@ import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.db.jdbc.JdbcUtils; -import io.airbyte.cdk.integrations.base.AirbyteMessageConsumer; import io.airbyte.cdk.integrations.base.Destination; +import io.airbyte.cdk.integrations.base.DestinationConfig; +import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer; import io.airbyte.cdk.integrations.destination.StandardNameTransformer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -26,6 +28,7 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage; import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Comparator; import java.util.List; @@ -95,22 +98,26 @@ static void cleanUp() { @Test void sanityTest() throws Exception { final Destination dest = new ClickhouseDestination(); - final AirbyteMessageConsumer consumer = dest.getConsumer(config, catalog, + DestinationConfig.initialize(config, dest.isV2Destination()); + final SerializedAirbyteMessageConsumer consumer = dest.getSerializedMessageConsumer(config, catalog, Destination::defaultOutputRecordCollector); final List expectedRecords = generateRecords(10); consumer.start(); expectedRecords.forEach(m -> { try { - consumer.accept(m); + final var strMessage = Jsons.jsonNode(m).toString(); + consumer.accept(strMessage, strMessage.getBytes(StandardCharsets.UTF_8).length); } catch (final Exception e) { throw new RuntimeException(e); } }); - consumer.accept(new AirbyteMessage() + final var abMessage = Jsons.jsonNode(new AirbyteMessage() .withType(Type.STATE) .withState(new AirbyteStateMessage() - .withData(Jsons.jsonNode(ImmutableMap.of(DB_NAME + "." + STREAM_NAME, 10))))); + .withData(Jsons.jsonNode(ImmutableMap.of(DB_NAME + "." + STREAM_NAME, 10))))) + .toString(); + consumer.accept(abMessage, abMessage.getBytes(StandardCharsets.UTF_8).length); consumer.close(); final JdbcDatabase database = new DefaultJdbcDatabase( @@ -126,8 +133,8 @@ void sanityTest() throws Exception { final List actualRecords = database.bufferedResultSetQuery( connection -> connection.createStatement().executeQuery( - String.format("SELECT * FROM %s.%s;", DB_NAME, - namingResolver.getRawTableName(STREAM_NAME))), + String.format("SELECT * FROM %s.%s;", "airbyte_internal", + StreamId.concatenateRawTableName(DB_NAME, STREAM_NAME))), JdbcUtils.getDefaultSourceOperations()::rowToJson); assertEquals( diff --git a/docs/integrations/destinations/clickhouse-migrations.md b/docs/integrations/destinations/clickhouse-migrations.md new file mode 100644 index 000000000000..df8590b36a56 --- /dev/null +++ b/docs/integrations/destinations/clickhouse-migrations.md @@ -0,0 +1,66 @@ +# Clickhouse Migration Guide + +## Upgrading to 1.0.0 + +This version removes the option to use "normalization" with clickhouse. It also changes +the schema and database of Airbyte's "raw" tables to be compatible with the new +[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2) +format. These changes will likely require updates to downstream dbt / SQL models. After this update, +Airbyte will only produce the ‘raw’ v2 tables, which store all content in JSON. These changes remove +the ability to do deduplicated syncs with Clickhouse. (Clickhouse has an overview)[[https://clickhouse.com/docs/en/integrations/dbt]] +for integrating with dbt If you are interested in the Clickhouse destination gaining the full features +of Destinations V2 (including final tables), click [[https://github.com/airbytehq/airbyte/discussions/35339]] +to register your interest. + +This upgrade will ignore any existing raw tables and will not migrate any data to the new schema. +For each stream, you could perform the following query to migrate the data from the old raw table +to the new raw table: + +```sql +-- assumes your database was 'default' +-- replace `{{stream_name}}` with replace your stream name + +CREATE TABLE airbyte_internal.default_raw__stream_{{stream_name}} +( + `_airbyte_raw_id` String, + `_airbyte_extracted_at` DateTime64(3, 'GMT') DEFAULT now(), + `_airbyte_loaded_at` DateTime64(3, 'GMT') NULL, + `_airbyte_data` String, + PRIMARY KEY(`_airbyte_raw_id`) +) +ENGINE = MergeTree; + +INSERT INTO `airbyte_internal`.`default_raw__stream_{{stream_name}}` + SELECT + `_airbyte_ab_id` AS "_airbyte_raw_id", + `_airbyte_emitted_at` AS "_airbyte_extracted_at", + NULL AS "_airbyte_loaded_at", + _airbyte_data AS "_airbyte_data" + FROM default._airbyte_raw_{{stream_name}}; +``` + +Airbyte will not delete any of your v1 data. + +### Database/Schema and the Internal Schema +We have split the raw and final tables into their own schemas, +which in clickhouse is analogous to a `database`. For the Clickhouse destination, this means that +we will only write into the raw table which will live in the `airbyte_internal` database. +The tables written into this schema will be prefixed with either the default database provided in +the `DB Name` field when configuring clickhouse (but can also be overridden in the connection). You can +change the "raw" database from the default `airbyte_internal` by supplying a value for +`Raw Table Schema Name`. + +For Example: + + - DB Name: `default` + - Stream Name: `my_stream` + +Writes to `airbyte_intneral.default_raw__stream_my_stream` + +where as: + + - DB Name: `default` + - Stream Name: `my_stream` + - Raw Table Schema Name: `raw_data` + +Writes to: `raw_data.default_raw__stream_my_stream` diff --git a/docs/integrations/destinations/clickhouse.md b/docs/integrations/destinations/clickhouse.md index 02446ba825f6..4495cb79e3da 100644 --- a/docs/integrations/destinations/clickhouse.md +++ b/docs/integrations/destinations/clickhouse.md @@ -44,6 +44,17 @@ You can create such a user by running: ``` GRANT CREATE ON * TO airbyte_user; +GRANT CREATE ON default * TO airbyte_user; +GRANT DROP ON * TO airbyte_user; +GRANT TRUNCATE ON * TO airbyte_user; +GRANT INSERT ON * TO airbyte_user; +GRANT SELECT ON * TO airbyte_user; +GRANT CREATE DATABASE ON airbyte_internal.* TO airbyte_user; +GRANT CREATE TABLE ON airbyte_internal.* TO airbyte_user; +GRANT DROP ON airbyte_internal.* TO airbyte_user; +GRANT TRUNCATE ON airbyte_internal.* TO airbyte_user; +GRANT INSERT ON airbyte_internal.* TO airbyte_user; +GRANT SELECT ON airbyte_internal.* TO airbyte_user; ``` You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte. @@ -78,7 +89,8 @@ Therefore, Airbyte ClickHouse destination will create tables and schemas using t ## Changelog | Version | Date | Pull Request | Subject | -| :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------- | +|:--------|:-----------| :--------------------------------------------------------- |:----------------------------------------------------------------------------------------------| +| 1.0.0 | 2024-02-07 | [\#34637](https://github.com/airbytehq/airbyte/pull/34637) | Update the raw table schema | | 0.2.5 | 2023-06-21 | [\#27555](https://github.com/airbytehq/airbyte/pull/27555) | Reduce image size | | 0.2.4 | 2023-06-05 | [\#27036](https://github.com/airbytehq/airbyte/pull/27036) | Internal code change for future development (install normalization packages inside connector) | | 0.2.3 | 2023-04-04 | [\#24604](https://github.com/airbytehq/airbyte/pull/24604) | Support for destination checkpointing |