Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Clickhouse - 1.0, remove normalization #34637

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
cdb1238
...
jbfbell Jan 24, 2024
6f7c28c
conflicts
jbfbell Jan 24, 2024
08f0173
all sorts of stuff
jbfbell Jan 25, 2024
d2cb631
working sync
jbfbell Jan 27, 2024
6443554
merge conflict
jbfbell Jan 29, 2024
35c3493
add breaking change metadata
jbfbell Jan 29, 2024
8004b50
formatting
jbfbell Jan 29, 2024
ecaf881
pr self review
jbfbell Jan 29, 2024
e5cfdf4
formatting
jbfbell Jan 29, 2024
7f7bd8f
merge conflict
jbfbell Jan 30, 2024
372edd2
merge conflict
jbfbell Jan 30, 2024
3a005d7
AirbyteLib: Installation improvements and improved error handling (#3…
aaronsteers Jan 30, 2024
98d760f
Revert Default Cloud Version (#34646)
jbfbell Jan 30, 2024
6408023
AirbyteLib: Progress Printer (#34588)
aaronsteers Jan 30, 2024
a4738b9
AirbyteLib: DuckDB Perf Boost (#34589)
aaronsteers Jan 30, 2024
923fcc5
airbyte-ci: Switch to prod pypi (#34606)
Jan 30, 2024
8f72720
airbyte-lib: Refactor connectors (#34552)
Jan 30, 2024
bd0a877
airbyte-lib: Fix validation (#34599)
Jan 30, 2024
b551732
📝Destination Astra DB Connector Name Update, Icon Update, Spec format…
Jan 30, 2024
cc1f1a2
airbyte-lib: Refactor follow-up (#34649)
Jan 30, 2024
70b05a6
Publish pokeapi to pypi (#34650)
Jan 30, 2024
b5e268e
:bug: Source Microsoft OneDrive: Fix Oauth (#34478)
tolik0 Jan 30, 2024
d7ddc77
Publish to pypi (#34652)
Jan 30, 2024
8cef869
Emit state when no partitions are generated for ccdk (#34605)
maxi297 Jan 30, 2024
8932636
Have StateBuilder return our actual state object and not simply a dic…
maxi297 Jan 30, 2024
b7a39ce
Publish to pypi batch 2 (#34656)
Jan 30, 2024
6a6b89c
Fix Cursor interface change + FileBased change at the same time (#34653)
maxi297 Jan 30, 2024
e3c14c7
Publish to pypi batch3 (#34657)
Jan 30, 2024
e808c51
🤖 Bump patch version of Python CDK
maxi297 Jan 30, 2024
15efffe
Revert "Publish to pypi batch3 (#34657)" (#34659)
Jan 30, 2024
ae3a028
airbyte-lib: Prepare for published connectors (#34651)
Jan 30, 2024
4ccdf36
Recreate pypi publish batch3 (#34660)
Jan 30, 2024
4136e9c
Update cdc.md - Add MongoDB support (#34671)
rwask Jan 30, 2024
82b7e5d
Pin file-based sources to airbyte-cdk version 0.59.2 (#34661)
clnoll Jan 30, 2024
7f2fddf
Fix log4j-slf4j-impl version conflicts (#34669)
postamar Jan 30, 2024
c391e2e
Source Faker: Declare primary keys (#34644)
aaronsteers Jan 30, 2024
d13b7c9
Destination Redshift - Bump CDK version to 0.16.3 (#34680)
jbfbell Jan 30, 2024
3bd0acd
Destination Teradata: make connector avaialble on Airbyte Cloud (#28667)
marcosmarxm Jan 30, 2024
73cda9a
Support resuming initial snapshot when id type is String, Int, Long (…
rodireich Jan 30, 2024
1da8866
Publish to pypi batch4 (#34666)
Jan 31, 2024
b8a806d
airbyte-ci: Test pypi published properly (#34689)
Jan 31, 2024
b8e4871
Publish to pypi batch5 (#34668)
Jan 31, 2024
c881fe0
Publish to pypi batch6 (#34672)
Jan 31, 2024
410fc8a
Publish to pypi batch7 (#34673)
Jan 31, 2024
0c2d92e
Kubernetes docs: update instructions to use external database (#34604)
marcosmarxm Jan 31, 2024
045ce95
Update file-based connectors for compatibility with concurrent CDK (#…
clnoll Jan 31, 2024
65a7f42
🚨🚨🐛Source Hubspot: update marketing_emails stream schema (#34492)
Jan 31, 2024
1e80b8d
Publish to pypi batch8 (#34690)
Jan 31, 2024
a5d3bad
Source Azure Table Storage: CDK Update (#34576)
ChristoGrab Jan 31, 2024
2cff690
Publish to pypi batch9 (#34691)
Jan 31, 2024
1ba7b21
Build a resume token with a pipeline consisting of selected streams (…
rodireich Jan 31, 2024
c6bc976
Publish to pypi batch10 (#34692)
Jan 31, 2024
a2e4026
Destination Postgres: Upgrade CDK with fixed dependency and unpin clo…
gisripa Jan 31, 2024
2c86106
CAT: fix NoAdditionalPropertiesValidator (#34709)
artem1205 Jan 31, 2024
365ca38
merge conflicts
jbfbell Feb 1, 2024
b81aa62
source recharge for some reason
jbfbell Feb 1, 2024
61d96d1
delete file
jbfbell Feb 1, 2024
79d6db0
passing tests
jbfbell Feb 2, 2024
a59b052
formatting
jbfbell Feb 2, 2024
e455a88
Merge branch 'master' into joseph.bell/32528/remove-clickhouse-normal…
jbfbell Feb 2, 2024
5de537d
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 2, 2024
d5c1cca
PR Feedback non gradle
jbfbell Feb 2, 2024
eeae224
PR Feedback non gradle - 2
jbfbell Feb 2, 2024
b7c4b2d
unneccessary class
jbfbell Feb 2, 2024
6448685
change jvm target
jbfbell Feb 3, 2024
bcfd84b
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 3, 2024
05f3b7c
gradle changes
jbfbell Feb 3, 2024
5b5efb1
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 5, 2024
86bbd12
clean up thread count access
jbfbell Feb 5, 2024
1e654f8
formatting
jbfbell Feb 5, 2024
470b70b
update yaml and markdown
jbfbell Feb 5, 2024
92c4ab1
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 6, 2024
2139544
test updates
jbfbell Feb 7, 2024
c68774e
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 7, 2024
541e694
formatting
jbfbell Feb 7, 2024
9182df2
merge conflict
jbfbell Feb 7, 2024
1011b79
disable ssh tests
jbfbell Feb 7, 2024
50d7719
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 7, 2024
64e6ad8
merge conflict
jbfbell Feb 8, 2024
7b59ac6
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 8, 2024
db03685
merge conflicts
jbfbell Feb 14, 2024
c486e3e
java 21 upgrade
jbfbell Feb 14, 2024
b64108e
merge conflicts
jbfbell Feb 14, 2024
d38359c
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 14, 2024
59219c6
java 21 upgrade
jbfbell Feb 14, 2024
162543b
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 14, 2024
40f9347
disable tests comment
jbfbell Feb 14, 2024
4908a81
pr feedback
jbfbell Feb 14, 2024
5b5303a
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 14, 2024
32fcb0f
pr feedback
jbfbell Feb 15, 2024
da31776
formatting
jbfbell Feb 15, 2024
0968210
merge conflicts
jbfbell Feb 15, 2024
19b6ab4
fix strict encrypt tests
jbfbell Feb 16, 2024
1fb15eb
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 16, 2024
369ffdc
formatting
jbfbell Feb 16, 2024
fdb9188
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 16, 2024
9ab52ae
clickhouse migration query
jbfbell Feb 16, 2024
f73622b
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 16, 2024
fbc5418
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 21, 2024
890ef95
remove append option
jbfbell Feb 21, 2024
65d4c11
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 21, 2024
f7f210f
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 21, 2024
89ebbc7
use non local cdk version
jbfbell Feb 21, 2024
ce5564f
bump version
jbfbell Feb 21, 2024
3bec74e
remove local version
jbfbell Feb 21, 2024
91e7e6a
merge conflicts
jbfbell Feb 21, 2024
376d151
use local again
jbfbell Feb 21, 2024
b79a4bf
cdk merge conflicts
jbfbell Feb 22, 2024
da2c3de
version bumps
jbfbell Feb 22, 2024
78fe4d1
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 22, 2024
f79ba7e
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 22, 2024
fb95636
use non local version
jbfbell Feb 22, 2024
97c34ed
Merge branch 'master' of github.com:airbytehq/airbyte into joseph.bel…
jbfbell Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/build.gradle
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import org.jetbrains.kotlin.gradle.dsl.JvmTarget
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion
plugins {
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
}

final var cdkVersion = {
var props = new Properties()
file("core/src/main/resources/version.properties").withInputStream(props::load)
return props.getProperty('version', 'undefined')
}()



allprojects {
apply plugin: 'java-library'
apply plugin: 'maven-publish'
apply plugin: 'java-test-fixtures'
apply plugin: 'org.jetbrains.kotlin.jvm'

group 'io.airbyte.cdk'

Expand Down Expand Up @@ -44,6 +53,19 @@ allprojects {
}
}
}

compileKotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_21
languageVersion = KotlinVersion.KOTLIN_1_9
}
}
compileTestKotlin {
compilerOptions {
jvmTarget = JvmTarget.JVM_21
languageVersion = KotlinVersion.KOTLIN_1_9
}
}
}

project.configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

package io.airbyte.cdk.integrations.base.ssh;

import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.*;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.CONNECTION_OPTIONS_KEY;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.GLOBAL_HEARTBEAT_INTERVAL_KEY;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_DEFAULT_IN_MILLIS;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.SESSION_HEARTBEAT_INTERVAL_KEY;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.getInstance;
import static io.airbyte.cdk.integrations.base.ssh.SshTunnel.sshWrap;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public String getNamespace(final String namespace) {
}

@Override
// @Deprecated see https://github.com/airbytehq/airbyte/issues/35333
public String getRawTableName(final String streamName) {
return convertStreamName("_airbyte_raw_" + streamName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.airbyte.cdk.integrations.util

import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog

/**
* For streams in [catalog] which do not have a namespace specified, explicitly set their namespace
* to the [defaultNamespace]
*/
fun addDefaultNamespaceToStreams(catalog: ConfiguredAirbyteCatalog, defaultNamespace: String?) {
if (defaultNamespace == null) {
return
}
// TODO: This logic exists in all V2 destinations.
// This is sad that if we forget to add this, there will be a null pointer during parseCatalog
for (catalogStream in catalog.streams) {
if (catalogStream.stream.namespace.isNullOrEmpty()) {
catalogStream.stream.namespace = defaultNamespace
}
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.21.4
version=0.22.1
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.integrations.destination.jdbc;

import static io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage;
import static io.airbyte.cdk.integrations.util.ConfiguredCatalogUtilKt.addDefaultNamespaceToStreams;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -40,7 +41,6 @@
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand All @@ -49,7 +49,6 @@
import java.util.function.Consumer;
import javax.sql.DataSource;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,6 +71,10 @@ protected SqlOperations getSqlOperations() {
return sqlOperations;
}

protected String getConfigSchemaKey() {
return "schema";
}

public AbstractJdbcDestination(final String driverClass,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOperations) {
Expand Down Expand Up @@ -276,44 +279,16 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final DataSource dataSource = getDataSource(config);
final JdbcDatabase database = getDatabase(dataSource);
final JdbcDatabase database = getDatabase(getDataSource(config));
final String defaultNamespace;
final TyperDeduper typerDeduper;
if (TypingAndDedupingFlag.isDestinationV2()) {
// TODO: This logic exists in all V2 destinations.
// This is sad that if we forget to add this, there will be a null pointer during parseCatalog
final String defaultNamespace = config.get("schema").asText();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
if (StringUtils.isEmpty(stream.getStream().getNamespace())) {
stream.getStream().setNamespace(defaultNamespace);
}
}
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.map(override -> new CatalogParser(sqlGenerator, override))
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator,
8);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator, 8);
}
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
database,
sqlOperations,
namingResolver,
config,
catalog,
defaultNamespace,
typerDeduper);
defaultNamespace = config.get(getConfigSchemaKey()).asText();
addDefaultNamespaceToStreams(catalog, defaultNamespace);
typerDeduper = getV2TyperDeduper(config, catalog, database);
} else {
defaultNamespace = null;
typerDeduper = new NoopTyperDeduper();
}
return JdbcBufferedConsumerFactory.createAsync(
outputRecordCollector,
Expand All @@ -322,8 +297,37 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
namingResolver,
config,
catalog,
null,
new NoopTyperDeduper());
defaultNamespace,
typerDeduper);
}

/**
* Creates the appropriate TyperDeduper class for the jdbc destination and the user's configuration
*
* @param config the configuration for the connection
* @param catalog the catalog for the connection
* @param database a database instance
* @return the appropriate TyperDeduper instance for this connection.
*/
private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JdbcDatabase database) {
final JdbcSqlGenerator sqlGenerator = getSqlGenerator();
final ParsedCatalog parsedCatalog = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE)
.map(override -> new CatalogParser(sqlGenerator, override))
.orElse(new CatalogParser(sqlGenerator))
.parseCatalog(catalog);
final String databaseName = getDatabaseName(config);
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler<TableDefinition> destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
jbfbell marked this conversation as resolved.
Show resolved Hide resolved
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
} else {
typerDeduper =
new DefaultTyperDeduper<>(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
}
return typerDeduper;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.airbyte.cdk.integrations.destination.jdbc.typing_deduping

import io.airbyte.cdk.integrations.destination.NamingConventionTransformer
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig
import org.jooq.Condition
import org.jooq.DataType
import org.jooq.Field
import org.jooq.SQLDialect
import java.util.*

/**
* Some Destinations do not support Typing and Deduping but have the updated raw table format
* SqlGenerator implementations are only for "final" tables and are a required input for
* TyperDeduper classes. This implementation appeases that requirement but does not implement
* any "final" table operations.
*/
class RawOnlySqlGenerator(private val namingTransformer: NamingConventionTransformer) :
jbfbell marked this conversation as resolved.
Show resolved Hide resolved
JdbcSqlGenerator(namingTransformer) {
override fun getStructType(): DataType<*>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun getArrayType(): DataType<*>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun getWidestType(): DataType<*>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun getDialect(): SQLDialect? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun extractRawDataFields(
columns: LinkedHashMap<ColumnId, AirbyteType>,
useExpensiveSaferCasting: Boolean
): List<Field<*>>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun buildAirbyteMetaColumn(columns: LinkedHashMap<ColumnId, AirbyteType>): Field<*>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun cdcDeletedAtNotNullCondition(): Condition? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun getRowNumber(
primaryKey: List<ColumnId>,
cursorField: Optional<ColumnId>
): Field<Int>? {
throw NotImplementedError("This Destination does not support final tables")
}

override fun existingSchemaMatchesStreamConfig(
stream: StreamConfig,
existingTable: TableDefinition
): Boolean {
throw NotImplementedError("This Destination does not support final tables")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,7 @@ public void testIncrementalDedupeSync() throws Exception {
.map(record -> Jsons.deserialize(record, AirbyteMessage.class))
.collect(Collectors.toList());
final JsonNode config = getConfig();
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, true);
runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, supportsNormalization());

final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(
new AirbyteMessage()
Expand Down Expand Up @@ -820,7 +820,7 @@ public void testIncrementalDedupeSync() throws Exception {
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(
Jsons.jsonNode(ImmutableMap.of("checkpoint", 2)))));
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, true);
runSyncAndVerifyStateOutput(config, secondSyncMessages, configuredCatalog, false);

final List<AirbyteMessage> expectedMessagesAfterSecondSync = new ArrayList<>();
expectedMessagesAfterSecondSync.addAll(firstSyncMessages);
Expand Down Expand Up @@ -853,22 +853,11 @@ public void testIncrementalDedupeSync() throws Exception {
final String defaultSchema = getDefaultSchema(config);
retrieveRawRecordsAndAssertSameMessages(catalog, expectedMessagesAfterSecondSync,
defaultSchema);
final List<AirbyteRecordMessage> actualMessages = retrieveNormalizedRecords(catalog,
defaultSchema);
assertSameMessages(expectedMessages, actualMessages, true);
}

private String generateBigString(final int addExtraCharacters) {
final int length = getMaxRecordValueLimit() + addExtraCharacters;
return RANDOM
.ints('a', 'z' + 1)
.limit(length)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

protected int getGenerateBigStringAddExtraCharacters() {
return 0;
if (normalizationFromDefinition()) {
final List<AirbyteRecordMessage> actualMessages = retrieveNormalizedRecords(catalog,
defaultSchema);
assertSameMessages(expectedMessages, actualMessages, true);
}
}

/**
Expand Down Expand Up @@ -1347,7 +1336,7 @@ private List<AirbyteMessage> runSync(

destination.close();

if (!runNormalization || (runNormalization && supportsInDestinationNormalization())) {
if (!runNormalization || (supportsInDestinationNormalization())) {
return destinationOutput;
}

Expand Down Expand Up @@ -1860,6 +1849,10 @@ public Stream<? extends Arguments> provideArguments(final ExtensionContext conte

}

private boolean supportsNormalization() {
return supportsInDestinationNormalization() || normalizationFromDefinition();
}

private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
return Jsons.object(Jsons.jsonNode(v1), klass);
}
Expand Down
Loading
Loading