From f302124dc8a0a65192e779702316a2de41adff74 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 27 Mar 2024 18:26:15 -0700 Subject: [PATCH] CDK module changes for destination --- .../cdk/db/factory/ConnectionFactory.kt | 1 + .../airbyte/cdk/db/factory/DatabaseDriver.kt | 5 +- .../AbstractJdbcCompatibleSourceOperations.kt | 13 +++-- .../cdk/db/jdbc/JdbcSourceOperations.kt | 2 +- .../cdk/integrations/base/ssh/SshTunnel.kt | 5 ++ .../jdbc/AbstractJdbcDestination.kt | 11 ++-- .../destination/jdbc/JdbcSqlOperations.kt | 4 +- .../destination/jdbc/SqlOperationsUtils.kt | 1 + .../jdbc/copy/SwitchingDestination.kt | 2 +- .../typing_deduping/JdbcDestinationHandler.kt | 11 ++-- .../jdbc/typing_deduping/JdbcSqlGenerator.kt | 6 +- .../destination/DestinationAcceptanceTest.kt | 12 ++-- .../destination/TestingNamespaces.kt | 2 + .../comparator/AdvancedTestDataComparator.kt | 4 +- .../typing_deduping/JdbcTypingDedupingTest.kt | 4 +- .../main/kotlin/io/airbyte/commons/io/IOs.kt | 8 +++ .../io/airbyte/commons/jackson/MoreMappers.kt | 1 + .../kotlin/io/airbyte/commons/json/Jsons.kt | 56 +++++++++++-------- .../io/airbyte/commons/lang/Exceptions.kt | 1 + .../io/airbyte/commons/string/Strings.kt | 3 + .../validation/json/JsonSchemaValidator.kt | 9 ++- .../destination/s3/EncryptionConfig.kt | 6 +- .../destination/s3/S3DestinationConfig.kt | 12 ++-- .../destination/s3/S3StorageOperations.kt | 8 ++- .../credential/S3AccessKeyCredentialConfig.kt | 6 +- .../staging/StagingConsumerFactory.kt | 1 + .../S3AvroParquetDestinationAcceptanceTest.kt | 2 +- .../s3/S3BaseAvroDestinationAcceptanceTest.kt | 10 ++-- .../destination/typing_deduping/ColumnId.kt | 12 +--- .../typing_deduping/InitialRawTableStatus.kt | 18 ++---- .../base/destination/typing_deduping/Sql.kt | 2 +- .../BaseSqlGeneratorIntegrationTest.kt | 2 +- .../typing_deduping/BaseTypingDedupingTest.kt | 19 +++++-- 33 files changed, 141 insertions(+), 118 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/ConnectionFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/ConnectionFactory.kt index 7050740f23f9..81ce6a01144b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/ConnectionFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/ConnectionFactory.kt @@ -23,6 +23,7 @@ object ConnectionFactory { * @param jdbcConnectionString The JDBC connection string. * @return The configured [Connection] */ + @JvmStatic fun create( username: String?, password: String?, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt index a068443018d9..acd40d32b67c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt @@ -4,10 +4,7 @@ package io.airbyte.cdk.db.factory /** Collection of JDBC driver class names and the associated JDBC URL format string. */ -enum class DatabaseDriver( - @JvmField val driverClassName: String, - @JvmField val urlFormatString: String -) { +enum class DatabaseDriver(val driverClassName: String, val urlFormatString: String) { CLICKHOUSE("com.clickhouse.jdbc.ClickHouseDriver", "jdbc:clickhouse:%s://%s:%d/%s"), DATABRICKS( "com.databricks.client.jdbc.Driver", diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt index 4125a6e0ae58..6ce36e575ed9 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt @@ -213,7 +213,12 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putTime(node: ObjectNode, columnName: String?, resultSet: ResultSet, index: Int) { + protected open fun putTime( + node: ObjectNode, + columnName: String?, + resultSet: ResultSet, + index: Int + ) { node.put( columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime::class.java)) @@ -221,7 +226,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putTimestamp( + protected open fun putTimestamp( node: ObjectNode, columnName: String?, resultSet: ResultSet, @@ -419,7 +424,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putTimeWithTimezone( + protected open fun putTimeWithTimezone( node: ObjectNode, columnName: String?, resultSet: ResultSet, @@ -430,7 +435,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putTimestampWithTimezone( + protected open fun putTimestampWithTimezone( node: ObjectNode, columnName: String?, resultSet: ResultSet, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt index c98a13b9d604..c6da0271af1c 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt @@ -15,7 +15,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory /** Implementation of source operations with standard JDBC types. */ -class JdbcSourceOperations : +open class JdbcSourceOperations : AbstractJdbcCompatibleSourceOperations(), SourceOperations { protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType { return try { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt index 20e4a8f56325..e254d1ad52c7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshTunnel.kt @@ -422,6 +422,7 @@ constructor( const val TIMEOUT_MILLIS: Int = 15000 // 15 seconds + @JvmStatic fun getInstance(config: JsonNode, hostKey: List, portKey: List): SshTunnel { val tunnelMethod = Jsons.getOptional(config, "tunnel_method", "tunnel_method") @@ -515,6 +516,7 @@ constructor( ) } + @JvmStatic @Throws(Exception::class) fun sshWrap( config: JsonNode, @@ -528,6 +530,7 @@ constructor( } } + @JvmStatic @Throws(Exception::class) fun sshWrap( config: JsonNode, @@ -540,6 +543,7 @@ constructor( } } + @JvmStatic @Throws(Exception::class) fun sshWrap( config: JsonNode, @@ -552,6 +556,7 @@ constructor( } } + @JvmStatic @Throws(Exception::class) fun sshWrap( config: JsonNode, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index b9c21dec012a..12886bb00ef2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory abstract class AbstractJdbcDestination( driverClass: String, - protected val namingResolver: NamingConventionTransformer, + protected open val namingResolver: NamingConventionTransformer, protected val sqlOperations: SqlOperations ) : JdbcConnector(driverClass), Destination { protected val configSchemaKey: String @@ -106,14 +106,14 @@ abstract class AbstractJdbcDestination>( +open class SwitchingDestination>( enumClass: Class, configToType: Function, typeToDestination: Map diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 426c27b4496a..3b8b8e521711 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -263,7 +263,7 @@ abstract class JdbcDestinationHandler( // We can't directly call record.set here, because that will raise a // ConcurrentModificationException on the fieldnames iterator. // Instead, build up a map of new fields and set them all at once. - newFields.put(fieldName.lowercase(Locale.getDefault()), record[fieldName]) + newFields[fieldName.lowercase(Locale.getDefault())] = record[fieldName] } record.setAll(newFields) @@ -271,16 +271,13 @@ abstract class JdbcDestinationHandler( .collect( toMap( { record -> - val nameNode: JsonNode = record.get(DESTINATION_STATE_TABLE_COLUMN_NAME) - val namespaceNode: JsonNode = - record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE) AirbyteStreamNameNamespacePair( - if (nameNode != null) nameNode.asText() else null, - if (namespaceNode != null) namespaceNode.asText() else null + record.get(DESTINATION_STATE_TABLE_COLUMN_NAME)?.asText(), + record.get(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)?.asText() ) }, { record -> - val stateNode: JsonNode = + val stateNode: JsonNode? = record.get(DESTINATION_STATE_TABLE_COLUMN_STATE) val state = if (stateNode != null) Jsons.deserialize(stateNode.asText()) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index 060e0b484f0c..0167d99183ed 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -144,7 +144,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio cursorField: Optional ): Field - protected val dslContext: DSLContext + protected open val dslContext: DSLContext get() = DSL.using(dialect) /** @@ -596,7 +596,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio .getSQL(ParamType.INLINED) } - protected fun castedField( + protected open fun castedField( field: Field<*>?, type: AirbyteType, alias: String?, @@ -625,7 +625,7 @@ abstract class JdbcSqlGenerator(protected val namingTransformer: NamingConventio return DSL.cast(field, toDialectType(type)) } - protected fun currentTimestamp(): Field { + protected open fun currentTimestamp(): Field { return DSL.currentTimestamp() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index c499b4e8c710..e3810f7c6bfe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -90,7 +90,7 @@ abstract class DestinationAcceptanceTest { protected var localRoot: Path? = null open protected var _testDataComparator: TestDataComparator = getTestDataComparator() - open fun getTestDataComparator(): TestDataComparator { + protected open fun getTestDataComparator(): TestDataComparator { return BasicTestDataComparator { this.resolveIdentifier(it) } } @@ -102,7 +102,7 @@ abstract class DestinationAcceptanceTest { */ get - protected fun supportsInDestinationNormalization(): Boolean { + protected open fun supportsInDestinationNormalization(): Boolean { return false } @@ -208,7 +208,7 @@ abstract class DestinationAcceptanceTest { /** * Override to return true if a destination implements namespaces and should be tested as such. */ - protected fun implementsNamespaces(): Boolean { + protected open fun implementsNamespaces(): Boolean { return false } @@ -308,7 +308,7 @@ abstract class DestinationAcceptanceTest { * - can throw any exception, test framework will handle. */ @Throws(Exception::class) - protected fun retrieveNormalizedRecords( + protected open fun retrieveNormalizedRecords( testEnv: TestDestinationEnv?, streamName: String?, namespace: String? @@ -1036,7 +1036,7 @@ abstract class DestinationAcceptanceTest { } } - protected val maxRecordValueLimit: Int + protected open val maxRecordValueLimit: Int /** @return the max limit length allowed for values in the destination. */ get() = 1000000000 @@ -1953,7 +1953,7 @@ abstract class DestinationAcceptanceTest { return false } - protected fun supportIncrementalSchemaChanges(): Boolean { + protected open fun supportIncrementalSchemaChanges(): Boolean { return false } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/TestingNamespaces.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/TestingNamespaces.kt index 405a5702deb4..e29d0512e2ec 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/TestingNamespaces.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/TestingNamespaces.kt @@ -38,6 +38,7 @@ object TestingNamespaces { * * @return convention-compliant namespace */ + @JvmStatic @JvmOverloads fun generate(prefix: String? = null): String { val userDefinedPrefix = if (prefix != null) prefix + "_" else "" @@ -58,6 +59,7 @@ object TestingNamespaces { * @param namespace to check * @return true if the namespace is older than 2 days, otherwise false */ + @JvmStatic fun isOlderThan2Days(namespace: String): Boolean { return isOlderThan(namespace, 2, ChronoUnit.DAYS) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt index 14ed4337b457..86d98f891ed0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt @@ -24,7 +24,7 @@ open class AdvancedTestDataComparator : TestDataComparator { } } - protected fun resolveIdentifier(identifier: String?): List { + protected open fun resolveIdentifier(identifier: String?): List { return java.util.List.of(identifier) } @@ -174,7 +174,7 @@ open class AdvancedTestDataComparator : TestDataComparator { .withZoneSameInstant(ZoneOffset.UTC) } - protected fun compareDateTimeWithTzValues( + protected open fun compareDateTimeWithTzValues( airbyteMessageValue: String, destinationValue: String ): Boolean { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt index 8434a199d6af..bee15390d042 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcTypingDedupingTest.kt @@ -36,7 +36,7 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() { protected abstract fun getDataSource(config: JsonNode?): DataSource? - protected val sourceOperations: JdbcCompatibleSourceOperations<*> + protected open val sourceOperations: JdbcCompatibleSourceOperations<*> /** * Subclasses may need to return a custom source operations if the default one does not * handle vendor-specific types correctly. For example, you most likely need to override @@ -44,7 +44,7 @@ abstract class JdbcTypingDedupingTest : BaseTypingDedupingTest() { */ get() = JdbcUtils.defaultSourceOperations - protected val rawSchema: String + protected open val rawSchema: String /** * Subclasses using a config with a nonstandard raw table schema should override this * method. diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt index ab9d12f565bb..be4ee585ce3d 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt @@ -13,11 +13,13 @@ import java.util.* import org.apache.commons.io.input.ReversedLinesFileReader object IOs { + @JvmStatic fun writeFile(path: Path, fileName: String?, contents: String?): Path { val filePath = path.resolve(fileName) return writeFile(filePath, contents) } + @JvmStatic fun writeFile(filePath: Path, contents: ByteArray): Path { try { Files.write(filePath, contents) @@ -27,6 +29,7 @@ object IOs { } } + @JvmStatic fun writeFile(filePath: Path, contents: String?): Path { try { Files.writeString(filePath, contents, StandardCharsets.UTF_8) @@ -40,6 +43,7 @@ object IOs { * Writes a file to a random directory in the /tmp folder. Useful as a staging group for test * resources. */ + @JvmStatic fun writeFileToRandomTmpDir(filename: String?, contents: String?): String { val source = Paths.get("/tmp", UUID.randomUUID().toString()) try { @@ -53,6 +57,7 @@ object IOs { } } + @JvmStatic fun readFile(path: Path, fileName: String?): String { return readFile(path.resolve(fileName)) } @@ -90,6 +95,7 @@ object IOs { } } + @JvmStatic fun inputStream(path: Path): InputStream { try { return Files.newInputStream(path) @@ -98,6 +104,7 @@ object IOs { } } + @JvmStatic fun silentClose(closeable: Closeable) { try { closeable.close() @@ -106,6 +113,7 @@ object IOs { } } + @JvmStatic fun newBufferedReader(inputStream: InputStream): BufferedReader { return BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8)) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/jackson/MoreMappers.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/jackson/MoreMappers.kt index 47fb08b77115..93d274d622b0 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/jackson/MoreMappers.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/jackson/MoreMappers.kt @@ -16,6 +16,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule * All jackson mapper creation should use the following methods for instantiation. */ object MoreMappers { + @JvmStatic fun initMapper(): ObjectMapper { val result = ObjectMapper().registerModule(JavaTimeModule()) result.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt index 38e488e91425..5c5eb575d731 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt @@ -42,11 +42,12 @@ object Jsons { } private val YAML_OBJECT_MAPPER: ObjectMapper = MoreMappers.initYamlMapper(YAMLFactory()) - private val OBJECT_WRITER: ObjectWriter = OBJECT_MAPPER!!.writer(JsonPrettyPrinter()) + private val OBJECT_WRITER: ObjectWriter = OBJECT_MAPPER.writer(JsonPrettyPrinter()) + @JvmStatic fun serialize(`object`: T): String { try { - return OBJECT_MAPPER!!.writeValueAsString(`object`) + return OBJECT_MAPPER.writeValueAsString(`object`) } catch (e: JsonProcessingException) { throw RuntimeException(e) } @@ -55,92 +56,102 @@ object Jsons { @JvmStatic fun deserialize(jsonString: String?, klass: Class?): T { try { - return OBJECT_MAPPER!!.readValue(jsonString, klass) + return OBJECT_MAPPER.readValue(jsonString, klass) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserialize(jsonString: String?, valueTypeRef: TypeReference?): T { try { - return OBJECT_MAPPER!!.readValue(jsonString, valueTypeRef) + return OBJECT_MAPPER.readValue(jsonString, valueTypeRef) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserialize(file: File?, klass: Class?): T { try { - return OBJECT_MAPPER!!.readValue(file, klass) + return OBJECT_MAPPER.readValue(file, klass) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserialize(file: File?, valueTypeRef: TypeReference?): T { try { - return OBJECT_MAPPER!!.readValue(file, valueTypeRef) + return OBJECT_MAPPER.readValue(file, valueTypeRef) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun convertValue(`object`: Any?, klass: Class?): T { - return OBJECT_MAPPER!!.convertValue(`object`, klass) + return OBJECT_MAPPER.convertValue(`object`, klass) } @JvmStatic fun deserialize(jsonString: String?): JsonNode { try { - return OBJECT_MAPPER!!.readTree(jsonString) + return OBJECT_MAPPER.readTree(jsonString) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserializeExact(jsonString: String?): JsonNode { try { - return OBJECT_MAPPER_EXACT!!.readTree(jsonString) + return OBJECT_MAPPER_EXACT.readTree(jsonString) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserialize(jsonBytes: ByteArray?): JsonNode { try { - return OBJECT_MAPPER!!.readTree(jsonBytes) + return OBJECT_MAPPER.readTree(jsonBytes) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun deserializeExact(jsonBytes: ByteArray?): JsonNode { try { - return OBJECT_MAPPER_EXACT!!.readTree(jsonBytes) + return OBJECT_MAPPER_EXACT.readTree(jsonBytes) } catch (e: IOException) { throw RuntimeException(e) } } + @JvmStatic fun tryDeserialize(jsonString: String, klass: Class): Optional { return try { - Optional.of(OBJECT_MAPPER!!.readValue(jsonString, klass)) + Optional.of(OBJECT_MAPPER.readValue(jsonString, klass)) } catch (e: Throwable) { handleDeserThrowable(e) } } + @JvmStatic fun tryDeserializeExact(jsonString: String?, klass: Class?): Optional { return try { - Optional.of(OBJECT_MAPPER_EXACT!!.readValue(jsonString, klass)) + Optional.of(OBJECT_MAPPER_EXACT.readValue(jsonString, klass)) } catch (e: Throwable) { handleDeserThrowable(e) } } + @JvmStatic fun tryDeserialize(jsonString: String?): Optional { return try { - Optional.of(OBJECT_MAPPER!!.readTree(jsonString)) + Optional.of(OBJECT_MAPPER.readTree(jsonString)) } catch (e: Throwable) { handleDeserThrowable(e) } @@ -153,9 +164,10 @@ object Jsons { * @param jsonString * @return */ + @JvmStatic fun tryDeserializeWithoutWarn(jsonString: String?): Optional { return try { - Optional.of(OBJECT_MAPPER!!.readTree(jsonString)) + Optional.of(OBJECT_MAPPER.readTree(jsonString)) } catch (e: Throwable) { Optional.empty() } @@ -163,12 +175,12 @@ object Jsons { @JvmStatic fun jsonNode(`object`: T): JsonNode { - return OBJECT_MAPPER!!.valueToTree(`object`) + return OBJECT_MAPPER.valueToTree(`object`) } @Throws(IOException::class) fun jsonNodeFromFile(file: File?): JsonNode { - return YAML_OBJECT_MAPPER!!.readTree(file) + return YAML_OBJECT_MAPPER.readTree(file) } @JvmStatic @@ -177,22 +189,22 @@ object Jsons { } fun arrayNode(): ArrayNode { - return OBJECT_MAPPER!!.createArrayNode() + return OBJECT_MAPPER.createArrayNode() } @JvmStatic fun `object`(jsonNode: JsonNode?, klass: Class?): T { - return OBJECT_MAPPER!!.convertValue(jsonNode, klass) + return OBJECT_MAPPER.convertValue(jsonNode, klass) } @JvmStatic fun `object`(jsonNode: JsonNode?, typeReference: TypeReference): T { - return OBJECT_MAPPER!!.convertValue(jsonNode, typeReference) + return OBJECT_MAPPER.convertValue(jsonNode, typeReference) } fun tryObject(jsonNode: JsonNode?, klass: Class?): Optional { return try { - Optional.of(OBJECT_MAPPER!!.convertValue(jsonNode, klass)) + Optional.of(OBJECT_MAPPER.convertValue(jsonNode, klass)) } catch (e: Exception) { Optional.empty() } @@ -200,7 +212,7 @@ object Jsons { fun tryObject(jsonNode: JsonNode?, typeReference: TypeReference?): Optional { return try { - Optional.of(OBJECT_MAPPER!!.convertValue(jsonNode, typeReference)) + Optional.of(OBJECT_MAPPER.convertValue(jsonNode, typeReference)) } catch (e: Exception) { Optional.empty() } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/lang/Exceptions.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/lang/Exceptions.kt index 02b98b9a215f..f0a1a73ddbe3 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/lang/Exceptions.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/lang/Exceptions.kt @@ -37,6 +37,7 @@ object Exceptions { * @param voidCallable * - function that throws a checked exception. */ + @JvmStatic fun toRuntime(voidCallable: Procedure) { castCheckedToRuntime(voidCallable) { cause: Exception? -> RuntimeException(cause) } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/string/Strings.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/string/Strings.kt index c3db584a0325..b5f53c462570 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/string/Strings.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/string/Strings.kt @@ -7,16 +7,19 @@ import java.util.* import org.apache.commons.lang3.RandomStringUtils object Strings { + @JvmStatic fun join(iterable: Iterable, separator: CharSequence): String { return iterable.joinToString(separator) { it.toString() } } + @JvmStatic fun addRandomSuffix(base: String, separator: String, suffixLength: Int): String { return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).lowercase(Locale.getDefault()) } + @JvmStatic fun safeTrim(string: String?): String? { return string?.trim { it <= ' ' } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt index f19365b031b5..185a614e67d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode import com.google.common.annotations.VisibleForTesting import com.google.common.base.Preconditions import com.networknt.schema.* -import io.airbyte.commons.string.Strings.join import java.io.File import java.io.IOException import java.net.URI @@ -59,11 +58,10 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR if (validationMessages.isEmpty()) { return } - throw JsonValidationException( String.format( "json schema validation failed when comparing the data to the json schema. \nErrors: %s \nSchema: \n%s", - join(validationMessages, ", "), + validationMessages.joinToString(", "), schemaName ) ) @@ -82,7 +80,7 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR if (!validationMessages.isEmpty()) { LOGGER.info( "JSON schema validation failed. \nerrors: {}", - join(validationMessages, ", ") + validationMessages.joinToString(", ") ) } @@ -120,7 +118,7 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR throw JsonValidationException( String.format( "json schema validation failed when comparing the data to the json schema. \nErrors: %s \nSchema: \n%s", - join(validationMessages, ", "), + validationMessages.joinToString(", "), schemaJson.toPrettyString() ) ) @@ -207,6 +205,7 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR * - the schema file * @return schema object processed from across all dependency files. */ + @JvmStatic fun getSchema(schemaFile: File?): JsonNode { try { return processor.process(schemaFile) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/EncryptionConfig.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/EncryptionConfig.kt index aa318261c77a..3cfcf71d96d3 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/EncryptionConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/EncryptionConfig.kt @@ -8,7 +8,8 @@ import java.util.* interface EncryptionConfig { companion object { - fun fromJson(encryptionNode: JsonNode?): EncryptionConfig? { + @JvmStatic + fun fromJson(encryptionNode: JsonNode?): EncryptionConfig { // For backwards-compatibility. Preexisting configs which don't contain the "encryption" // key will // pass a null JsonNode into this method. @@ -16,8 +17,7 @@ interface EncryptionConfig { return NoEncryption() } - val encryptionType = encryptionNode["encryption_type"].asText() - return when (encryptionType) { + return when (val encryptionType = encryptionNode["encryption_type"].asText()) { "none" -> NoEncryption() "aes_cbc_envelope" -> AesCbcEnvelopeEncryption.Companion.fromJson(encryptionNode) else -> throw IllegalArgumentException("Invalid encryption type: $encryptionType") diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt index ac4442807646..6b1a0a16501f 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationConfig.kt @@ -25,13 +25,13 @@ import org.slf4j.LoggerFactory * data syncing to S3) */ open class S3DestinationConfig { - @JvmField val endpoint: String? - @JvmField val bucketName: String? - @JvmField val bucketPath: String? - @JvmField val bucketRegion: String? - @JvmField val pathFormat: String? + val endpoint: String? + val bucketName: String? + val bucketPath: String? + val bucketRegion: String? + val pathFormat: String? val s3CredentialConfig: S3CredentialConfig? - @JvmField val formatConfig: S3FormatConfig? + val formatConfig: S3FormatConfig? var fileNamePattern: String? = null private set diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 7ba877c7bc41..875250ec3bd8 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -24,7 +24,7 @@ import io.airbyte.commons.exceptions.ConfigErrorException import io.github.oshai.kotlinlogging.KotlinLogging import java.io.IOException import java.io.OutputStream -import java.util.UUID +import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.AtomicInteger @@ -302,7 +302,7 @@ open class S3StorageOperations( ListObjectsRequest() .withBucketName(bucket) .withPrefix( - objectPath + objectPath, ) // pathFormat may use subdirectories under the objectPath to organize files // so we need to recursively list them and filter files matching the pathFormat .withDelimiter(""), @@ -416,6 +416,10 @@ open class S3StorageOperations( ) } + fun uploadManifest(bucketName: String, manifestFilePath: String, manifestContents: String) { + s3Client.putObject(s3Config.bucketName, manifestFilePath, manifestContents) + } + companion object { const val DEFAULT_UPLOAD_THREADS: Int = 10 // The S3 cli uses 10 threads by default. const val R2_UPLOAD_THREADS: Int = 3 diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.kt index fc45739f546e..f683027a637f 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/credential/S3AccessKeyCredentialConfig.kt @@ -8,10 +8,8 @@ import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.AWSStaticCredentialsProvider import com.amazonaws.auth.BasicAWSCredentials -class S3AccessKeyCredentialConfig( - @JvmField val accessKeyId: String?, - @JvmField val secretAccessKey: String? -) : S3CredentialConfig { +class S3AccessKeyCredentialConfig(val accessKeyId: String?, val secretAccessKey: String?) : + S3CredentialConfig { override val credentialType: S3CredentialType get() = S3CredentialType.ACCESS_KEY diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt index 09125c12b016..a3c73327172d 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingConsumerFactory.kt @@ -171,6 +171,7 @@ private constructor( private val SYNC_DATETIME: Instant = Instant.now() + @JvmStatic fun builder( outputRecordCollector: Consumer, database: JdbcDatabase?, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt index 3a5c2572dccc..d827b6c1cf04 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt @@ -32,7 +32,7 @@ abstract class S3AvroParquetDestinationAcceptanceTest protected constructor(s3Fo val messages = readMessagesFromFile(messagesFileName) val config = this.getConfig() - val defaultSchema = getDefaultSchema(config!!) + val defaultSchema = getDefaultSchema(config) val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) runSyncAndVerifyStateOutput(config, messages, configuredCatalog, false) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseAvroDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseAvroDestinationAcceptanceTest.kt index f346315b7e66..f2709ea343f4 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseAvroDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3BaseAvroDestinationAcceptanceTest.kt @@ -49,8 +49,8 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() : val objectSummaries = getAllSyncedObjects(streamName, namespace) val jsonRecords: MutableList = LinkedList() - for (objectSummary in objectSummaries!!) { - val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key) + for (objectSummary in objectSummaries) { + val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key) DataFileReader( SeekableByteArrayInput(`object`.objectContent.readAllBytes()), GenericDatumReader() @@ -81,8 +81,8 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() : val objectSummaries = getAllSyncedObjects(streamName, namespace) val resultDataTypes: MutableMap?> = HashMap() - for (objectSummary in objectSummaries!!) { - val `object` = s3Client!!.getObject(objectSummary!!.bucketName, objectSummary.key) + for (objectSummary in objectSummaries) { + val `object` = s3Client!!.getObject(objectSummary.bucketName, objectSummary.key) DataFileReader( SeekableByteArrayInput(`object`.objectContent.readAllBytes()), GenericDatumReader() @@ -91,7 +91,7 @@ abstract class S3BaseAvroDestinationAcceptanceTest protected constructor() : while (dataFileReader.hasNext()) { val record = dataFileReader.next() val actualDataTypes = getTypes(record) - resultDataTypes.putAll(actualDataTypes!!) + resultDataTypes.putAll(actualDataTypes) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/ColumnId.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/ColumnId.kt index 09c3792ebcd0..5629b4085e6b 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/ColumnId.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/ColumnId.kt @@ -14,18 +14,8 @@ package io.airbyte.integrations.base.destination.typing_deduping * Useful if a destination warehouse handles columns ignoring case, but preserves case in the table * schema. */ -class ColumnId(name: String, originalName: String, canonicalName: String) { +data class ColumnId(val name: String, val originalName: String, val canonicalName: String) { fun name(quote: String): String { return quote + name + quote } - - val name: String - val originalName: String - val canonicalName: String - - init { - this.name = name - this.originalName = originalName - this.canonicalName = canonicalName - } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt index 2175f5cf6aee..49df7a54bc49 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/InitialRawTableStatus.kt @@ -6,18 +6,8 @@ package io.airbyte.integrations.base.destination.typing_deduping import java.time.Instant import java.util.* -class InitialRawTableStatus( - rawTableExists: Boolean, - hasUnprocessedRecords: Boolean, - maxProcessedTimestamp: Optional -) { - val rawTableExists: Boolean - val hasUnprocessedRecords: Boolean +data class InitialRawTableStatus( + val rawTableExists: Boolean, + val hasUnprocessedRecords: Boolean, val maxProcessedTimestamp: Optional - - init { - this.rawTableExists = rawTableExists - this.hasUnprocessedRecords = hasUnprocessedRecords - this.maxProcessedTimestamp = maxProcessedTimestamp - } -} +) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt index 78d1e961b348..c3ebb5a33a55 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt @@ -19,7 +19,7 @@ import java.util.stream.Stream * Callers are encouraged to use the static factory methods instead of the public constructor. */ @JvmRecord -data class Sql(@JvmField val transactions: List>) { +data class Sql(val transactions: List>) { /** * @param begin The SQL statement to start a transaction, typically "BEGIN" * @param commit The SQL statement to commit a transaction, typically "COMMIT" diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt index 3e09bef3cf0e..766c7a26be39 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.kt @@ -58,7 +58,7 @@ abstract class BaseSqlGeneratorIntegrationTest + protected abstract val destinationHandler: DestinationHandler protected var namespace: String? = null protected lateinit var streamId: StreamId diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index 733c6566e150..99b841fed022 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -183,7 +183,7 @@ abstract class BaseTypingDedupingTest { * * @return */ - protected fun disableFinalTableComparison(): Boolean { + protected open fun disableFinalTableComparison(): Boolean { return false } @@ -560,7 +560,7 @@ abstract class BaseTypingDedupingTest { // Second sync val messages2 = readMessages("dat/sync2_messages.jsonl") - val trimmedSchema = SCHEMA!!.deepCopy() + val trimmedSchema = SCHEMA.deepCopy() (trimmedSchema["properties"] as ObjectNode).remove("name") stream.jsonSchema = trimmedSchema @@ -573,7 +573,7 @@ abstract class BaseTypingDedupingTest { readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl") .stream() .peek { record: JsonNode -> - (record as ObjectNode).remove(sqlGenerator.buildColumnId("name")!!.name) + (record as ObjectNode).remove(sqlGenerator.buildColumnId("name").name) } .toList() verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison()) @@ -711,7 +711,7 @@ abstract class BaseTypingDedupingTest { */ @Test @Throws(Exception::class) - fun identicalNameSimultaneousSync() { + open fun identicalNameSimultaneousSync() { val namespace1 = streamNamespace + "_1" val catalog1 = io.airbyte.protocol.models.v0 @@ -816,7 +816,7 @@ abstract class BaseTypingDedupingTest { @Test @Throws(Exception::class) fun incrementalDedupChangeCursor() { - val mangledSchema = SCHEMA!!.deepCopy() + val mangledSchema = SCHEMA.deepCopy() (mangledSchema["properties"] as ObjectNode).remove("updated_at") (mangledSchema["properties"] as ObjectNode).set( "old_cursor", @@ -945,6 +945,7 @@ abstract class BaseTypingDedupingTest { * !!!!!! WARNING !!!!!! The code below was mostly copypasted from DestinationAcceptanceTest. If you * make edits here, you probably want to also edit there. */ + @JvmOverloads @Throws(Exception::class) protected fun runSync( catalog: io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog, @@ -1066,9 +1067,15 @@ abstract class BaseTypingDedupingTest { return Companion.readMessages(filename, streamNamespace, streamName) } + protected fun readRecords(filename: String): List { + return Companion.readRecords(filename) + } + + protected val schema: JsonNode = SCHEMA + companion object { private val LOGGER: Logger = LoggerFactory.getLogger(BaseTypingDedupingTest::class.java) - protected var SCHEMA: JsonNode? = null + protected val SCHEMA: JsonNode init { try {