diff --git a/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt index c1932fbe672b..fc11a57fcfaa 100644 --- a/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/azure-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/azure/AzureBlobStorageStreamCopier.kt @@ -50,9 +50,9 @@ abstract class AzureBlobStorageStreamCopier( @Suppress("DEPRECATION") @get:VisibleForTesting val tmpTableName: String = nameTransformer.getTmpTableName(streamName) - protected val activeStagingWriterFileNames: MutableSet = HashSet() - private val csvPrinters = HashMap() - private val blobClients = HashMap() + protected val activeStagingWriterFileNames: MutableSet = HashSet() + private val csvPrinters = HashMap() + private val blobClients = HashMap() override var currentFile: String? = null @Throws(Exception::class) @@ -210,7 +210,7 @@ abstract class AzureBlobStorageStreamCopier( @Throws(Exception::class) override fun closeNonCurrentStagingFileWriters() { LOGGER.info("Begin closing non current file writers") - val removedKeys: MutableSet = HashSet() + val removedKeys: MutableSet = HashSet() for (key in activeStagingWriterFileNames) { if (key != currentFile) { csvPrinters[key]!!.close() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt index 0cd8733c0847..d5b01105c5d0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/DefaultJdbcDatabase.kt @@ -26,14 +26,14 @@ constructor( sourceOperations: JdbcCompatibleSourceOperations<*>? = JdbcUtils.defaultSourceOperations ) : JdbcDatabase(sourceOperations) { @Throws(SQLException::class) - override fun execute(query: CheckedConsumer) { + override fun execute(query: CheckedConsumer) { dataSource.connection.use { connection -> query.accept(connection) } } @Throws(SQLException::class) override fun bufferedResultSetQuery( - query: CheckedFunction, - recordTransform: CheckedFunction + query: CheckedFunction, + recordTransform: CheckedFunction ): List { dataSource.connection.use { connection -> toUnsafeStream(query.apply(connection), recordTransform).use { results -> @@ -45,8 +45,8 @@ constructor( @MustBeClosed @Throws(SQLException::class) override fun unsafeResultSetQuery( - query: CheckedFunction, - recordTransform: CheckedFunction + query: CheckedFunction, + recordTransform: CheckedFunction ): Stream { val connection = dataSource.connection return JdbcDatabase.Companion.toUnsafeStream(query.apply(connection), recordTransform) @@ -114,8 +114,8 @@ constructor( @MustBeClosed @Throws(SQLException::class) override fun unsafeQuery( - statementCreator: CheckedFunction, - recordTransform: CheckedFunction + statementCreator: CheckedFunction, + recordTransform: CheckedFunction ): Stream { val connection = dataSource.connection return JdbcDatabase.Companion.toUnsafeStream( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt index ec298fd51f42..41ff260e3f5a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt @@ -31,7 +31,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource * @throws SQLException SQL related exceptions. */ @Throws(SQLException::class) - abstract fun execute(query: CheckedConsumer) + abstract fun execute(query: CheckedConsumer) @Throws(SQLException::class) override fun execute(sql: String?) { @@ -39,7 +39,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource } @Throws(SQLException::class) - fun executeWithinTransaction(queries: List) { + fun executeWithinTransaction(queries: List) { execute { connection: Connection -> connection.autoCommit = false for (s in queries) { @@ -67,8 +67,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource */ @Throws(SQLException::class) abstract fun bufferedResultSetQuery( - query: CheckedFunction, - recordTransform: CheckedFunction + query: CheckedFunction, + recordTransform: CheckedFunction ): List /** @@ -89,8 +89,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource @MustBeClosed @Throws(SQLException::class) abstract fun unsafeResultSetQuery( - query: CheckedFunction, - recordTransform: CheckedFunction + query: CheckedFunction, + recordTransform: CheckedFunction ): Stream /** @@ -99,8 +99,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource */ @Throws(SQLException::class) fun queryStrings( - query: CheckedFunction, - recordTransform: CheckedFunction + query: CheckedFunction, + recordTransform: CheckedFunction ): List { unsafeResultSetQuery(query, recordTransform).use { stream -> return stream.toList() @@ -126,8 +126,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource @MustBeClosed @Throws(SQLException::class) abstract fun unsafeQuery( - statementCreator: CheckedFunction, - recordTransform: CheckedFunction + statementCreator: CheckedFunction, + recordTransform: CheckedFunction ): Stream /** @@ -136,8 +136,8 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource */ @Throws(SQLException::class) fun queryJsons( - statementCreator: CheckedFunction, - recordTransform: CheckedFunction + statementCreator: CheckedFunction, + recordTransform: CheckedFunction ): List { unsafeQuery(statementCreator, recordTransform).use { stream -> return stream.toList() @@ -229,7 +229,7 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource @MustBeClosed fun toUnsafeStream( resultSet: ResultSet, - mapper: CheckedFunction + mapper: CheckedFunction ): Stream { return StreamSupport.stream( object : AbstractSpliterator(Long.MAX_VALUE, ORDERED) { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/StreamingJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/StreamingJdbcDatabase.kt index a95b534355a8..9d826fbdebb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/StreamingJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/StreamingJdbcDatabase.kt @@ -46,8 +46,8 @@ class StreamingJdbcDatabase( @MustBeClosed @Throws(SQLException::class) override fun unsafeQuery( - statementCreator: CheckedFunction, - recordTransform: CheckedFunction + statementCreator: CheckedFunction, + recordTransform: CheckedFunction ): Stream { try { val connection = dataSource.connection @@ -79,7 +79,7 @@ class StreamingJdbcDatabase( */ protected fun toUnsafeStream( resultSet: ResultSet, - mapper: CheckedFunction, + mapper: CheckedFunction, streamingConfig: JdbcStreamingQueryConfig ): Stream { return StreamSupport.stream( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt index 6b69969e9623..62af52d91aad 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteExceptionHandler.kt @@ -111,7 +111,7 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler { * 1. Contain the original exception message as the external message, and a mangled message * as the internal message. */ - @VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet = HashSet() + @VisibleForTesting val STRINGS_TO_DEINTERPOLATE: MutableSet = HashSet() init { addCommonStringsToDeinterpolate() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index b38f6d41561b..41fc8651f727 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -284,7 +284,7 @@ internal constructor( private fun readConcurrent( config: JsonNode, catalog: ConfiguredAirbyteCatalog, - stateOptional: Optional + stateOptional: Optional ) { val streams = source!!.readStreams(config, catalog, stateOptional.orElse(null)) @@ -327,7 +327,7 @@ internal constructor( private fun readSerial( config: JsonNode, catalog: ConfiguredAirbyteCatalog, - stateOptional: Optional + stateOptional: Optional ) { try { source!!.read(config, catalog, stateOptional.orElse(null)).use { messageIterator -> diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt index c1d18172bce6..31f80de484d5 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferDequeue.kt @@ -58,14 +58,13 @@ class BufferDequeue( val output: MutableList = LinkedList() while (queue!!.size() > 0) { val memoryItem: - MemoryBoundedLinkedBlockingQueue.MemoryItem< - StreamAwareQueue.MessageWithMeta?>? = + MemoryBoundedLinkedBlockingQueue.MemoryItem = queue.peek().orElseThrow() // otherwise pull records until we hit the memory limit. - val newSize: Long = (memoryItem?.size ?: 0) + bytesRead.get() + val newSize: Long = (memoryItem.size) + bytesRead.get() if (newSize <= optimalBytesToRead) { - memoryItem?.size?.let { bytesRead.addAndGet(it) } + memoryItem.size.let { bytesRead.addAndGet(it) } queue.poll()?.item?.let { output.add(it) } } else { break diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryBoundedLinkedBlockingQueue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryBoundedLinkedBlockingQueue.kt index a81fcf1db50e..029d0dd26e50 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryBoundedLinkedBlockingQueue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/MemoryBoundedLinkedBlockingQueue.kt @@ -28,7 +28,7 @@ private val logger = KotlinLogging.logger {} * @param type in the queue */ class MemoryBoundedLinkedBlockingQueue(maxMemoryUsage: Long) { - private val hiddenQueue = HiddenQueue(maxMemoryUsage) + private val hiddenQueue = HiddenQueue(maxMemoryUsage) val currentMemoryUsage: Long get() = hiddenQueue.currentMemoryUsage.get() @@ -48,16 +48,16 @@ class MemoryBoundedLinkedBlockingQueue(maxMemoryUsage: Long) { return hiddenQueue.offer(e, itemSizeInBytes) } - fun peek(): MemoryItem? { + fun peek(): MemoryItem? { return hiddenQueue.peek() } @Throws(InterruptedException::class) - fun take(): MemoryItem { + fun take(): MemoryItem { return hiddenQueue.take() } - fun poll(): MemoryItem? { + fun poll(): MemoryItem? { return hiddenQueue.poll() } @@ -65,7 +65,7 @@ class MemoryBoundedLinkedBlockingQueue(maxMemoryUsage: Long) { fun poll( timeout: Long, unit: TimeUnit, - ): MemoryItem? { + ): MemoryItem? { return hiddenQueue.poll(timeout, unit) } @@ -78,7 +78,7 @@ class MemoryBoundedLinkedBlockingQueue(maxMemoryUsage: Long) { * * @param */ - private class HiddenQueue(maxMemoryUsage: Long) : LinkedBlockingQueue?>() { + private class HiddenQueue(maxMemoryUsage: Long) : LinkedBlockingQueue>() { val currentMemoryUsage: AtomicLong = AtomicLong(0) val maxMemoryUsage: AtomicLong = AtomicLong(maxMemoryUsage) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/StreamAwareQueue.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/StreamAwareQueue.kt index 50b6e6ef963e..6cb55d4264cf 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/StreamAwareQueue.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/StreamAwareQueue.kt @@ -37,7 +37,7 @@ class StreamAwareQueue(maxMemoryUsage: Long) { return Optional.ofNullable(timeOfLastMessage.get()) } - fun peek(): Optional> { + fun peek(): Optional> { return Optional.ofNullable(memoryAwareQueue.peek()) } @@ -59,11 +59,11 @@ class StreamAwareQueue(maxMemoryUsage: Long) { } @Throws(InterruptedException::class) - fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem { + fun take(): MemoryBoundedLinkedBlockingQueue.MemoryItem { return memoryAwareQueue.take() } - fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem? { + fun poll(): MemoryBoundedLinkedBlockingQueue.MemoryItem? { return memoryAwareQueue.poll() } @@ -71,7 +71,7 @@ class StreamAwareQueue(maxMemoryUsage: Long) { fun poll( timeout: Long, unit: TimeUnit, - ): MemoryBoundedLinkedBlockingQueue.MemoryItem? { + ): MemoryBoundedLinkedBlockingQueue.MemoryItem? { return memoryAwareQueue.poll(timeout, unit) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt index d2e56c746977..97c8f6fbdb66 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt @@ -154,7 +154,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { var bytesFlushed: Long = 0L logger.info { "Flushing states" } synchronized(lock) { - for (entry: Map.Entry?> in + for (entry: Map.Entry> in descToStateIdQ.entries) { // Remove all states with 0 counters. // Per-stream synchronized is required to make sure the state (at the head of the @@ -196,7 +196,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { bytesFlushed += oldestState.second // cleanup - entry.value!!.poll() + entry.value.poll() stateIdToState.remove(oldestStateId) stateIdToCounter.remove(oldestStateId) stateIdToCounterForPopulatingDestinationStats.remove(oldestStateId) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt index a11bcfb44743..7c67b240acaa 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/SqlOperations.kt @@ -29,7 +29,7 @@ interface SqlOperations { * @throws Exception exception */ @Throws(Exception::class) - fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) + fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String) /** * Denotes whether the schema exists in destination database diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingOperations.kt index ee84df4cf632..6484d9800660 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/StagingOperations.kt @@ -64,7 +64,7 @@ interface StagingOperations : SqlOperations { schemaName: String?, stageName: String?, stagingPath: String? - ): String? + ): String /** * Load the data stored in the stage area into a temporary table in the destination @@ -80,7 +80,7 @@ interface StagingOperations : SqlOperations { database: JdbcDatabase?, stageName: String?, stagingPath: String?, - stagedFiles: List?, + stagedFiles: List?, tableName: String?, schemaName: String? ) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ApmTraceUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ApmTraceUtils.kt index ee4fd35e5d09..9d40482386a0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ApmTraceUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ApmTraceUtils.kt @@ -34,7 +34,7 @@ object ApmTraceUtils { * @param tags A map of tags to be added to the currently active span. */ @JvmOverloads - fun addTagsToTrace(tags: Map, tagPrefix: String? = TAG_PREFIX) { + fun addTagsToTrace(tags: Map, tagPrefix: String? = TAG_PREFIX) { addTagsToTrace(GlobalTracer.get().activeSpan(), tags, tagPrefix) } @@ -45,10 +45,10 @@ object ApmTraceUtils { * @param tags A map of tags to be added to the currently active span. * @param tagPrefix The prefix to be added to each custom tag name. */ - fun addTagsToTrace(span: Span?, tags: Map, tagPrefix: String?) { + fun addTagsToTrace(span: Span?, tags: Map, tagPrefix: String?) { if (span != null) { tags.entries.forEach( - Consumer { entry: Map.Entry -> + Consumer { entry: Map.Entry -> span.setTag(formatTag(entry.key, tagPrefix), entry.value.toString()) } ) @@ -83,12 +83,12 @@ object ApmTraceUtils { * * @param tags A map of tags to be added to the root span. */ - fun addTagsToRootSpan(tags: Map) { + fun addTagsToRootSpan(tags: Map) { val activeSpan = GlobalTracer.get().activeSpan() if (activeSpan is MutableSpan) { val localRootSpan = (activeSpan as MutableSpan).localRootSpan tags.entries.forEach( - Consumer { entry: Map.Entry -> + Consumer { entry: Map.Entry -> localRootSpan.setTag(formatTag(entry.key, TAG_PREFIX), entry.value.toString()) } ) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt index 30a8f76fcc3b..a824f38804b3 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt @@ -74,7 +74,7 @@ internal class TestJdbcUtils { private fun getConfig(psqlDb: PostgreSQLContainer<*>?, dbName: String?): JsonNode { return Jsons.jsonNode( - ImmutableMap.builder() + ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, psqlDb!!.host) .put(JdbcUtils.PORT_KEY, psqlDb.firstMappedPort) .put(JdbcUtils.DATABASE_KEY, dbName) @@ -91,7 +91,7 @@ internal class TestJdbcUtils { sslValue: T ): JsonNode { return Jsons.jsonNode( - ImmutableMap.builder() + ImmutableMap.builder() .put("host", psqlDb!!.host) .put("port", psqlDb.firstMappedPort) .put("database", dbName) @@ -214,7 +214,7 @@ internal class TestJdbcUtils { fun testUseSslWithEmptySslKeyAndSslModeVerifyFull() { val config = Jsons.jsonNode( - ImmutableMap.builder() + ImmutableMap.builder() .put("host", PSQL_DB.host) .put("port", PSQL_DB.firstMappedPort) .put("database", dbName) @@ -240,7 +240,7 @@ internal class TestJdbcUtils { fun testUseSslWithEmptySslKeyAndSslModeDisable() { val config = Jsons.jsonNode( - ImmutableMap.builder() + ImmutableMap.builder() .put("host", PSQL_DB.host) .put("port", PSQL_DB.firstMappedPort) .put("database", dbName) @@ -430,7 +430,7 @@ internal class TestJdbcUtils { val actual = sourceOperations.rowToJson(resultSet) // field-wise comparison to make debugging easier. - MoreStreams.toStream(expected.fields()).forEach { e: Map.Entry -> + MoreStreams.toStream(expected.fields()).forEach { e: Map.Entry -> Assertions.assertEquals(e.value, actual[e.key], "key: " + e.key) } Assertions.assertEquals(expected, actual) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt index 4bd8fe949d8d..effb0a3348c7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/extensions/LoggingInvocationInterceptor.kt @@ -64,8 +64,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { val logLineSuffix: String val methodMatcher = methodPattern.matcher(methodName) if (methodName == "interceptDynamicTest") { - logLineSuffix = - "execution of DynamicTest %s".formatted(extensionContext!!.displayName) + logLineSuffix = "execution of DynamicTest ${extensionContext!!.displayName}" } else if (methodName == "interceptTestClassConstructor") { logLineSuffix = "instance creation for %s".formatted(reflectiveInvocationContext!!.targetClass) @@ -86,7 +85,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { ) TestContext.CURRENT_TEST_NAME.set("$targetClassName.$methodName") } else { - logLineSuffix = "execution of unknown intercepted call %s".formatted(methodName) + logLineSuffix = "execution of unknown intercepted call $methodName" } val currentThread = Thread.currentThread() val timeoutTask = TimeoutInteruptor(currentThread) @@ -115,22 +114,22 @@ class LoggingInvocationInterceptor : InvocationInterceptor { } catch (throwable: Throwable) { timeoutTask.cancel() val elapsedMs = Duration.between(start, Instant.now()).toMillis() - var t1: Throwable + val t1: Throwable if (timeoutTask.wasTriggered) { + val timeoutAsString = + DurationFormatUtils.formatDurationWords(elapsedMs, true, true) t1 = TimeoutException( - ("Execution was cancelled after %s. If you think your test should be given more time to complete, you can use the @Timeout annotation. If all the test of a connector are slow, " + - " you can override the property 'JunitMethodExecutionTimeout' in your gradle.properties.") - .formatted( - DurationFormatUtils.formatDurationWords(elapsedMs, true, true) - ) + "Execution was cancelled after $timeoutAsString. If you think your test should be given more time to complete, " + + "you can use the @Timeout annotation. If all the test of a connector are slow, " + + " you can override the property 'JunitMethodExecutionTimeout' in your gradle.properties." ) t1.initCause(throwable) } else { t1 = throwable } var belowCurrentCall = false - val stackToDisplay: MutableList = LinkedList() + val stackToDisplay: MutableList = LinkedList() for (stackString in ExceptionUtils.getStackFrames(throwable)) { if (stackString!!.startsWith("\tat ")) { if ( @@ -255,8 +254,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptAfterAllMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ) { proxy!!.interceptAfterAllMethod(invocation, invocationContext, extensionContext) @@ -264,8 +263,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptAfterEachMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ) { proxy!!.interceptAfterEachMethod(invocation, invocationContext, extensionContext) @@ -273,8 +272,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptBeforeAllMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ) { proxy!!.interceptBeforeAllMethod(invocation, invocationContext, extensionContext) @@ -282,8 +281,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptBeforeEachMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ) { proxy!!.interceptBeforeEachMethod(invocation, invocationContext, extensionContext) @@ -291,7 +290,7 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptDynamicTest( - invocation: InvocationInterceptor.Invocation?, + invocation: InvocationInterceptor.Invocation?, invocationContext: DynamicTestInvocationContext?, extensionContext: ExtensionContext? ) { @@ -316,8 +315,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptTestTemplateMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ) { proxy!!.interceptTestTemplateMethod(invocation, invocationContext, extensionContext) @@ -325,8 +324,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptTestFactoryMethod( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?, extensionContext: ExtensionContext? ): T? { return proxy!!.interceptTestFactoryMethod(invocation, invocationContext, extensionContext) @@ -334,8 +333,8 @@ class LoggingInvocationInterceptor : InvocationInterceptor { @Throws(Throwable::class) override fun interceptTestClassConstructor( - invocation: InvocationInterceptor.Invocation?, - invocationContext: ReflectiveInvocationContext?>?, + invocation: InvocationInterceptor.Invocation?, + invocationContext: ReflectiveInvocationContext?>?, extensionContext: ExtensionContext? ): T? { return proxy!!.interceptTestClassConstructor( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt index 17a8c46415b8..df1b5162fb73 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/NonContainer.kt @@ -15,7 +15,7 @@ class NonContainer( private val jdbcUrl: String?, private val driverClassName: String?, dockerImageName: String -) : JdbcDatabaseContainer(dockerImageName) { +) : JdbcDatabaseContainer(dockerImageName) { override fun getDriverClassName(): String? { return driverClassName } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt index f71d93947ef0..d386c720169b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -193,7 +193,7 @@ protected constructor(val container: C) : AutoCloseable { String.format("executing command %s", Strings.join(cmd.asIterable(), " ")) ) ) - val exec = container.execInContainer(*cmd.toTypedArray()) + val exec = container.execInContainer(*cmd.toTypedArray()) if (exec!!.exitCode == 0) { LOGGER.info( formatLogLine( diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQuerySourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQuerySourceOperations.kt index d295dffc5ae7..d2e04a06f509 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQuerySourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQuerySourceOperations.kt @@ -31,7 +31,7 @@ import java.util.function.Consumer import org.slf4j.Logger import org.slf4j.LoggerFactory -class BigQuerySourceOperations : SourceOperations { +class BigQuerySourceOperations : SourceOperations { private val BIG_QUERY_DATE_FORMAT: DateFormat = SimpleDateFormat("yyyy-MM-dd") private val BIG_QUERY_DATETIME_FORMAT: DateFormat = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") private val BIG_QUERY_TIMESTAMP_FORMAT: DateFormat = @@ -152,7 +152,7 @@ class BigQuerySourceOperations : SourceOperations JsonSchemaType.BOOLEAN StandardSQLTypeName.INT64 -> JsonSchemaType.INTEGER diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt index 02f11b3d0802..5bcd1bf1b861 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoDatabase.kt @@ -52,7 +52,7 @@ class MongoDatabase(connectionString: String, databaseName: String) : val databaseNames: MongoIterable get() = mongoClient.listDatabaseNames() - val collectionNames: Set + val collectionNames: Set get() { val collectionNames = database.listCollectionNames() ?: return Collections.emptySet() return MoreIterators.toSet(collectionNames.iterator()) diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt index 3d8d35c9b1ee..b9963310a916 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt @@ -221,7 +221,7 @@ object MongoUtils { fieldName: String ): JsonNode { reader.readStartArray() - val elements = Lists.newArrayList() + val elements = Lists.newArrayList() while (reader.readBsonType() != BsonType.END_OF_DOCUMENT) { val arrayFieldType = reader.currentBsonType diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt index 7f07a3c43097..f17d7f16b513 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/AbstractJdbcDestination.kt @@ -377,7 +377,7 @@ abstract class AbstractJdbcDestination = HashSet() + protected val schemaSet: MutableSet = HashSet() protected constructor() @Throws(Exception::class) - override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String?) { + override fun createSchemaIfNotExists(database: JdbcDatabase?, schemaName: String) { try { if (!schemaSet.contains(schemaName) && !isSchemaExists(database, schemaName)) { database!!.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.kt index 72309e3628aa..acb14b3658f0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/TableDefinition.kt @@ -8,4 +8,4 @@ package io.airbyte.cdk.integrations.destination.jdbc * * @param columns */ -@JvmRecord data class TableDefinition(val columns: LinkedHashMap) +@JvmRecord data class TableDefinition(val columns: LinkedHashMap) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt index 1ae7ccd6850c..2a50769c402a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyConsumerFactory.kt @@ -178,12 +178,12 @@ object CopyConsumerFactory { ) { var failed = hasFailed var firstException: Exception? = null - val streamCopiers: List = ArrayList(pairToCopier.values) + val streamCopiers: List = ArrayList(pairToCopier.values) try { val queries: MutableList = ArrayList() for (copier in streamCopiers) { try { - copier!!.closeStagingUploader(failed) + copier.closeStagingUploader(failed) if (!failed) { copier.createDestinationSchema() copier.createTemporaryTable() @@ -207,7 +207,7 @@ object CopyConsumerFactory { } } finally { for (copier in streamCopiers) { - copier!!.removeFileAndDropTmpTable() + copier.removeFileAndDropTmpTable() } close(dataSource) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt index 0895df3cd18a..952fe9a2cf77 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/CopyDestination.kt @@ -100,7 +100,7 @@ abstract class CopyDestination : BaseConnector, Destination { @Throws(Exception::class) protected fun performCreateInsertTestOnDestination( - outputSchema: String?, + outputSchema: String, database: JdbcDatabase, nameTransformer: NamingConventionTransformer ) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 83974a6cc9b5..a0d197e1413f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -512,7 +512,7 @@ abstract class JdbcDestinationHandler( // TODO: normalize namespace and finalName strings to quoted-lowercase (as // needed. Snowflake // requires uppercase) - val columnDefinitions = LinkedHashMap() + val columnDefinitions = LinkedHashMap() LOGGER.info( "Retrieving existing columns for {}.{}.{}", catalogName, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index fc4dbd562979..b66b003aef0e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -178,8 +178,8 @@ constructor( val dataFields = columns.entries - .map { column: Map.Entry -> - DSL.field(DSL.quotedName(column.key!!.name), toDialectType(column.value)) + .map { column: Map.Entry -> + DSL.field(DSL.quotedName(column.key.name), toDialectType(column.value)) } .toList() + fields return dataFields diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt index 6007693df5eb..cee25ab843bd 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/GeneralStagingFunctions.kt @@ -98,7 +98,7 @@ object GeneralStagingFunctions { database: JdbcDatabase?, stageName: String?, stagingPath: String?, - stagedFiles: List?, + stagedFiles: List?, tableName: String?, schemaName: String?, stagingOperations: StagingOperations, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt index fee8a95b0cd4..fda2c17bb259 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/staging/SerialFlush.kt @@ -109,7 +109,7 @@ object SerialFlush { database, stageName, stagingPath, - java.util.List.of(stagedFile), + listOf(stagedFile), writeConfig.outputTableName, schemaName, stagingOperations, diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index ac4b6b0e97c6..caa84d1dfa9a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -87,7 +87,7 @@ abstract class DestinationAcceptanceTest { open protected var _testDataComparator: TestDataComparator = getTestDataComparator() protected open fun getTestDataComparator(): TestDataComparator { - return BasicTestDataComparator { this.resolveIdentifier(it) } + return BasicTestDataComparator { @Suppress("deprecated") this.resolveIdentifier(it) } } protected abstract val imageName: String @@ -342,7 +342,7 @@ abstract class DestinationAcceptanceTest { implementation of the method to your comparator implementation.""" ) protected open fun resolveIdentifier(identifier: String?): List { - return java.util.List.of(identifier) + return listOf(identifier) } @BeforeEach diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.kt index a77b4ae71399..b27bf135301d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/JdbcDestinationAcceptanceTest.kt @@ -19,7 +19,7 @@ abstract class JdbcDestinationAcceptanceTest : DestinationAcceptanceTest() { protected fun getJsonFromRecord( record: Record, - valueParser: Function> + valueParser: Function> ): JsonNode { val node = mapper.createObjectNode() diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt index 1725302c55c1..f7f042d250df 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/comparator/AdvancedTestDataComparator.kt @@ -24,9 +24,7 @@ open class AdvancedTestDataComparator : TestDataComparator { } } - protected open fun resolveIdentifier(identifier: String?): List { - return java.util.List.of(identifier) - } + protected open fun resolveIdentifier(identifier: String?): List = listOf(identifier) protected open fun compareObjects(expectedObject: JsonNode, actualObject: JsonNode) { if (!areBothEmpty(expectedObject, actualObject)) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt index c4dee9dfcf5f..53ef2bee192d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt @@ -87,7 +87,7 @@ class AirbyteDebeziumHandler( ) val schemaHistoryManager: Optional = if (trackSchemaHistory) - Optional.of( + Optional.of( AirbyteSchemaHistoryStorage.Companion.initializeDBHistory( cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index 2d8eed56d374..b65ba98a136c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -58,8 +58,8 @@ class AirbyteFileOffsetBackingStore( save(mappedAsStrings) } - private fun updateStateForDebezium2_1(mapAsString: Map): Map { - val updatedMap: MutableMap = LinkedHashMap() + private fun updateStateForDebezium2_1(mapAsString: Map): Map { + val updatedMap: MutableMap = LinkedHashMap() if (mapAsString.size > 0) { // We're getting the 1st of a map. Something fishy going on here val key = mapAsString.keys.toList()[0] @@ -76,7 +76,7 @@ class AirbyteFileOffsetBackingStore( if (dbName.isPresent) SQL_SERVER_STATE_MUTATION.apply(key.substring(i, i1 + 1), dbName.get()) else key.substring(i, i1 + 1) - val value = mapAsString[key] + val value = mapAsString.getValue(key) updatedMap[newKey] = value } return updatedMap @@ -103,9 +103,9 @@ class AirbyteFileOffsetBackingStore( } catch (e: NoSuchFileException) { // NoSuchFileException: Ignore, may be new. // EOFException: Ignore, this means the file was missing or corrupt - return emptyMap() + return emptyMap() } catch (e: EOFException) { - return emptyMap() + return emptyMap() } catch (e: IOException) { throw ConnectException(e) } catch (e: ClassNotFoundException) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt index 37d67f7a5175..50506b6f8122 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt @@ -24,7 +24,7 @@ class DebeziumRecordPublisher(private val debeziumPropertiesManager: DebeziumPro private var engine: DebeziumEngine>? = null private val hasClosed = AtomicBoolean(false) private val isClosing = AtomicBoolean(false) - private val thrownError = AtomicReference() + private val thrownError = AtomicReference() private val engineLatch = CountDownLatch(1) fun start( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateUtil.kt index 6a5ec8b572e5..cacff275a581 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumStateUtil.kt @@ -89,7 +89,7 @@ interface DebeziumStateUtil { const val CONNECTOR_NAME_PROPERTY: String = "name" /** Configuration for offset state key/value converters. */ - val INTERNAL_CONVERTER_CONFIG: Map = + val INTERNAL_CONVERTER_CONFIG: Map = java.util.Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false.toString()) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/SnapshotMetadata.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/SnapshotMetadata.kt index f34141431ca1..49a4421804e7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/SnapshotMetadata.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/SnapshotMetadata.kt @@ -15,7 +15,7 @@ enum class SnapshotMetadata { NULL; companion object { - private val ENTRIES_OF_SNAPSHOT_EVENTS: Set = + private val ENTRIES_OF_SNAPSHOT_EVENTS: Set = ImmutableSet.of(TRUE, FIRST, FIRST_IN_DATA_COLLECTION, LAST_IN_DATA_COLLECTION) private val STRING_TO_ENUM: MutableMap = HashMap(12) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index e83cdd7d0d56..1f19ffa47028 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -265,7 +265,7 @@ abstract class AbstractJdbcSource( database: JdbcDatabase, schema: String? ): List>> { - val internalSchemas: Set = HashSet(excludedInternalNameSpaces) + val internalSchemas: Set = HashSet(excludedInternalNameSpaces) LOGGER.info("Internal schemas to exclude: {}", internalSchemas) val tablesWithSelectGrantPrivilege = getPrivilegesTableForCurrentUser(database, schema) @@ -329,7 +329,7 @@ abstract class AbstractJdbcSource( } protected fun excludeNotAccessibleTables( - internalSchemas: Set, + internalSchemas: Set, tablesWithSelectGrantPrivilege: Set? ): Predicate { return Predicate { jsonNode: JsonNode -> @@ -350,7 +350,7 @@ abstract class AbstractJdbcSource( // getPrivilegesTableForCurrentUser() protected open fun isNotInternalSchema( jsonNode: JsonNode, - internalSchemas: Set + internalSchemas: Set ): Boolean { return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt index a5bbe87058ae..5560d1e25f5b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt @@ -23,7 +23,7 @@ class StateDecoratingIterator( private val initialCursor: String, private val cursorType: JsonSchemaPrimitiveUtil.JsonSchemaPrimitive, stateEmissionFrequency: Int -) : AbstractIterator(), MutableIterator { +) : AbstractIterator(), MutableIterator { private var currentMaxCursor: String? private var currentMaxCursorRecordCount = 0L private var hasEmittedFinalState = false diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt index 2db0937008da..cf3a41646b6b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt @@ -188,10 +188,10 @@ class CursorManager( } .orElse(null) // if cursor field is set in state. - if (stateOptional.map?>(cursorFieldFunction).isPresent) { + if (stateOptional.map?>(cursorFieldFunction).isPresent) { // if cursor field in catalog and state are the same. if ( - stateOptional.map?>(cursorFieldFunction) == + stateOptional.map?>(cursorFieldFunction) == streamOptional.map> { obj: ConfiguredAirbyteStream -> obj.cursorField } @@ -275,7 +275,7 @@ class CursorManager( * @return An [Optional] possibly containing the cursor field name associated with the cursor * tracked in the state associated with the provided stream name/namespace tuple. */ - fun getCursorField(pair: AirbyteStreamNameNamespacePair?): Optional { + fun getCursorField(pair: AirbyteStreamNameNamespacePair?): Optional { return getCursorInfo(pair).map { obj: CursorInfo -> obj.cursorField } } @@ -287,7 +287,7 @@ class CursorManager( * @return An [Optional] possibly containing the cursor value tracked in the state associated * with the provided stream name/namespace tuple. */ - fun getCursor(pair: AirbyteStreamNameNamespacePair?): Optional { + fun getCursor(pair: AirbyteStreamNameNamespacePair?): Optional { return getCursorInfo(pair).map { obj: CursorInfo -> obj.cursor } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index 33c08d2d433e..19aa245325dc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -53,7 +53,7 @@ class GlobalStateManager( ) } - override val rawStateMessages: List? + override val rawStateMessages: List? get() { throw UnsupportedOperationException( "Raw state retrieval not supported by global state manager." diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt index 66b50de6099f..45317487c8ad 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt @@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory state management.""" ) class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog) : - AbstractStateManager( + AbstractStateManager( catalog, Supplier { dbState.streams }, CURSOR_FUNCTION, @@ -57,7 +57,7 @@ class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog) : this.isCdc = dbState.cdc ?: false } - override val rawStateMessages: List? + override val rawStateMessages: List? get() { throw UnsupportedOperationException( "Raw state retrieval not supported by global state manager." diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt index 14750fafe0db..06d79f3c2e90 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt @@ -37,7 +37,7 @@ interface StateManager { * @throws UnsupportedOperationException if the state manager does not support retrieving raw * state. */ - val rawStateMessages: List? + val rawStateMessages: List? /** * Retrieves the map of stream name/namespace tuple to the current cursor information for that @@ -67,7 +67,7 @@ interface StateManager { * @return An [Optional] possibly containing the cursor value tracked in the state associated * with the provided stream name/namespace tuple. */ - fun getCursor(pair: AirbyteStreamNameNamespacePair?): Optional { + fun getCursor(pair: AirbyteStreamNameNamespacePair?): Optional { return getCursorInfo(pair).map { obj: CursorInfo -> obj.cursor } } @@ -79,7 +79,7 @@ interface StateManager { * @return An [Optional] possibly containing the cursor field name associated with the cursor * tracked in the state associated with the provided stream name/namespace tuple. */ - fun getCursorField(pair: AirbyteStreamNameNamespacePair?): Optional? { + fun getCursorField(pair: AirbyteStreamNameNamespacePair?): Optional? { return getCursorInfo(pair).map { obj: CursorInfo -> obj.cursorField } } @@ -91,7 +91,7 @@ interface StateManager { * @return An [Optional] possibly containing the original cursor value tracked in the state * associated with the provided stream name/namespace tuple. */ - fun getOriginalCursor(pair: AirbyteStreamNameNamespacePair?): Optional? { + fun getOriginalCursor(pair: AirbyteStreamNameNamespacePair?): Optional? { return getCursorInfo(pair).map { obj: CursorInfo -> obj.originalCursor } } @@ -103,7 +103,7 @@ interface StateManager { * @return An [Optional] possibly containing the original cursor field name associated with the * cursor tracked in the state associated with the provided stream name/namespace tuple. */ - fun getOriginalCursorField(pair: AirbyteStreamNameNamespacePair?): Optional? { + fun getOriginalCursorField(pair: AirbyteStreamNameNamespacePair?): Optional? { return getCursorInfo(pair).map { obj: CursorInfo -> obj.originalCursorField } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index 1e34647d779d..52c408d25d89 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -47,7 +47,7 @@ open class StreamStateManager ) } - override val rawStateMessages: List? + override val rawStateMessages: List? get() = rawAirbyteStateMessages override fun toState(pair: Optional): AirbyteStateMessage { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt index 6c8edd581ce7..3370d83405f9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt @@ -18,7 +18,7 @@ class DebeziumRecordIteratorTest { val debeziumRecordIterator = DebeziumRecordIterator( mock(), - object : CdcTargetPosition { + object : CdcTargetPosition { override fun reachedTargetPosition( changeEventWithMetadata: ChangeEventWithMetadata? ): Boolean { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt index 77631caf760e..0ab7f9349cd9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt @@ -61,7 +61,7 @@ internal class DefaultJdbcSourceAcceptanceTest : additionalParameters: String ): JsonNode { return Jsons.jsonNode( - ImmutableMap.builder() + ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, resolveHost(psqlDb)) .put(JdbcUtils.PORT_KEY, resolvePort(psqlDb)) .put(JdbcUtils.DATABASE_KEY, dbName) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt index 8d05e260d212..010599e2625c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManagerTest.kt @@ -435,7 +435,7 @@ class StreamStateManagerTest { private fun createStreamState( name: String?, namespace: String?, - cursorFields: List?, + cursorFields: List?, cursorValue: String?, cursorRecordCount: Long ): AirbyteStateMessage { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 3f2a35fd6fa6..286bffedbd94 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -343,7 +343,7 @@ abstract class CdcSourceTest> { expectedRecords: Set?, actualRecords: Set, cdcStreams: Set, - streamNames: Set, + streamNames: Set, namespace: String? ) { val actualData = @@ -1344,7 +1344,7 @@ abstract class CdcSourceTest> { const val MODELS_STREAM_NAME: String = "models" const val MODELS_STREAM_NAME_2: String = "models_2" - @JvmField val STREAM_NAMES: Set = java.util.Set.of(MODELS_STREAM_NAME) + @JvmField val STREAM_NAMES: Set = java.util.Set.of(MODELS_STREAM_NAME) protected const val COL_ID: String = "id" protected const val COL_MAKE_ID: String = "make_id" protected const val COL_MODEL: String = "model" diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index f7446c5dd74f..f3c2b39cd3e7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -91,7 +91,7 @@ abstract class JdbcSourceAcceptanceTest> { ) } - protected fun primaryKeyClause(columns: List): String { + protected fun primaryKeyClause(columns: List): String { if (columns.isEmpty()) { return "" } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index 533b0007749e..055fd87d5141 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -93,6 +93,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { testDataHolders.forEach( Consumer { testDataHolder: TestDataHolder -> val airbyteStream = streams[testDataHolder.nameWithTestPrefix] + @Suppress("unchecked_cast") val jsonSchemaTypeMap = Jsons.deserialize( airbyteStream!!.jsonSchema["properties"][testColumnName].toString(), @@ -120,7 +121,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { class MissedRecords( // Stream that is missing any value var streamName: String?, // Which are the values that has not being gathered from the source - var missedValues: List? + var missedValues: List ) class UnexpectedRecord(val streamName: String, val unexpectedValue: String?) @@ -131,10 +132,10 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { val recordMessages = allMessages.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } - val expectedValues: MutableMap?> = HashMap() - val missedValuesByStream: MutableMap> = HashMap() + val expectedValues: MutableMap> = HashMap() + val missedValuesByStream: MutableMap> = HashMap() val unexpectedValuesByStream: MutableMap> = HashMap() - val testByName: MutableMap = HashMap() + val testByName: MutableMap = HashMap() // If there is no expected value in the test set we don't include it in the list to be // asserted @@ -152,7 +153,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { ) for (message in recordMessages) { - val streamName = message!!.record.stream + val streamName = message.record.stream val expectedValuesForStream = expectedValues[streamName] if (expectedValuesForStream != null) { val value = getValueFromJsonNode(message.record.data[testColumnName]) @@ -166,26 +167,22 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { } // Gather all the missing values, so we don't stop the test in the first missed one - expectedValues.forEach { (streamName: String?, values: List?) -> - if (!values!!.isEmpty()) { + expectedValues.forEach { (streamName: String, values: List) -> + if (values.isNotEmpty()) { missedValuesByStream.putIfAbsent(streamName, ArrayList()) missedValuesByStream[streamName]!!.add(MissedRecords(streamName, values)) } } - val errorsByStream: MutableMap> = HashMap() + val errorsByStream: MutableMap> = HashMap() for (streamName in unexpectedValuesByStream.keys) { errorsByStream.putIfAbsent(streamName, ArrayList()) val test = testByName.getValue(streamName) val unexpectedValues: List = unexpectedValuesByStream[streamName]!! for (unexpectedValue in unexpectedValues) { errorsByStream[streamName]!!.add( - "The stream '%s' checking type '%s' initialized at %s got unexpected values: %s".formatted( - streamName, - test.sourceType, - test.declarationLocation, - unexpectedValue - ) + "The stream '${streamName}' checking type '${test.sourceType}' initialized " + + "at ${test.declarationLocation} got unexpected values: $unexpectedValue" ) } } @@ -196,17 +193,13 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { val missedValues: List = missedValuesByStream[streamName]!! for (missedValue in missedValues) { errorsByStream[streamName]!!.add( - "The stream '%s' checking type '%s' initialized at %s is missing values: %s".formatted( - streamName, - test.sourceType, - test.declarationLocation, - missedValue - ) + "The stream '$streamName' checking type '${test.sourceType}' initialized at " + + "${test.declarationLocation} is missing values: $missedValue" ) } } - val errorStrings: MutableList = ArrayList() + val errorStrings: MutableList = ArrayList() for (errors in errorsByStream.values) { errorStrings.add(StringUtils.join(errors, "\n")) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt index bb96403fdf15..90a090fcb6e6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt @@ -115,7 +115,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() { @Throws(IOException::class) private fun runExecutableInternal(cmd: Command): Path { LOGGER.info("testRoot = $testRoot") - val dockerCmd: List = + val dockerCmd: List = Lists.newArrayList( "docker", "run", diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt index 72acbf38b5fb..278514430865 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt @@ -125,8 +125,8 @@ internal constructor( * @param expectedValue value which should be provided by a streamer * @return builder */ - fun addExpectedValues(vararg expectedValue: String?): TestDataHolderBuilder { - expectedValues.addAll(Arrays.asList(*expectedValue)) + fun addExpectedValues(vararg expectedValue: String): TestDataHolderBuilder { + expectedValues.addAll(expectedValue) return this } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt index 253fe840ef02..34e9911fecf1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt @@ -81,7 +81,7 @@ abstract class AbstractSourceFillDbWithTestData : AbstractSourceBasePerformanceT * * Stream.of( Arguments.of("your_db_name", "your_schema_name", 100, 2, 240, 1000) ); */ - protected abstract fun provideParameters(): Stream? + protected abstract fun provideParameters(): Stream? protected fun prepareCreateTableQuery( dbSchemaName: String?, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt index cdf43a25adbe..b72205662374 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt @@ -55,7 +55,7 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest * Arguments.of("newregular25tables50000records", "dbo", 50052, 8, 25), * Arguments.of("newsmall1000tableswith10000rows", "dbo", 10011, 8, 1000) ); */ - protected abstract fun provideParameters(): Stream? + protected abstract fun provideParameters(): Stream? @ParameterizedTest @MethodSource("provideParameters") diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/concurrency/VoidCallable.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/concurrency/VoidCallable.kt index 4a109be96f10..8c8b28e2863e 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/concurrency/VoidCallable.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/concurrency/VoidCallable.kt @@ -6,7 +6,7 @@ package io.airbyte.commons.concurrency import java.util.concurrent.Callable @FunctionalInterface -fun interface VoidCallable : Callable { +fun interface VoidCallable : Callable { @Throws(Exception::class) override fun call(): Void? { voidCall() diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt index d8f1f2c30ebe..8c7a91ffc09a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt @@ -47,13 +47,13 @@ class Enums { private val NORMALIZED_ENUMS: ConcurrentMap, Map> = Maps.newConcurrentMap() - fun ?, T2 : Enum?> isCompatible(c1: Class, c2: Class): Boolean { + fun , T2 : Enum> isCompatible(c1: Class, c2: Class): Boolean { Preconditions.checkArgument(c1.isEnum) Preconditions.checkArgument(c2.isEnum) return (c1.enumConstants.size == c2.enumConstants.size && Sets.difference( - c1.enumConstants.map { obj: T1 -> obj!!.name }.toSet(), - c2.enumConstants.map { obj: T2 -> obj!!.name }.toSet(), + c1.enumConstants.map { obj: T1 -> obj.name }.toSet(), + c2.enumConstants.map { obj: T2 -> obj.name }.toSet(), ) .isEmpty()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt index 46a7ebe87592..df2e4ec1894b 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/EnvVariableFeatureFlags.kt @@ -24,7 +24,7 @@ class EnvVariableFeatureFlags : FeatureFlags { return getEnvOrDefault(APPLY_FIELD_SELECTION, false) { s: String -> s.toBoolean() } } - override fun fieldSelectionWorkspaces(): String? { + override fun fieldSelectionWorkspaces(): String { return getEnvOrDefault(FIELD_SELECTION_WORKSPACES, "") { arg: String -> arg } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagHelper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagHelper.kt index 43852a29d4f0..3f0db0e05739 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagHelper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagHelper.kt @@ -32,7 +32,7 @@ object FeatureFlagHelper { ): Boolean { val workspaceIdsString = flagRetriever.apply(featureFlags) val workspaceIds: MutableSet = HashSet() - if (workspaceIdsString != null && !workspaceIdsString.isEmpty()) { + if (!workspaceIdsString.isNullOrEmpty()) { for (id in workspaceIdsString .split(",".toRegex()) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt index cca688469c08..a8626b46ec64 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlags.kt @@ -27,7 +27,7 @@ interface FeatureFlags { * * @return a comma-separated list of workspace ids where field selection should be enabled. */ - fun fieldSelectionWorkspaces(): String? + fun fieldSelectionWorkspaces(): String /** * Get the workspaces allow-listed for strict incremental comparison in normalization. This diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index e32b0e5533f8..056c6730332c 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -20,7 +20,7 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags return wrapped.applyFieldSelection() } - override fun fieldSelectionWorkspaces(): String? { + override fun fieldSelectionWorkspaces(): String { return wrapped.fieldSelectionWorkspaces() } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt index 65edebf38faa..9ae08dd83449 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt @@ -73,7 +73,7 @@ object IOs { @Throws(IOException::class) @JvmStatic - fun getTail(numLines: Int, path: Path?): List { + fun getTail(numLines: Int, path: Path?): List { if (path == null) { return emptyList() } @@ -85,7 +85,7 @@ object IOs { ReversedLinesFileReader.Builder().setFile(file).setCharset(Charsets.UTF_8).get().use { fileReader -> - val lines: MutableList = ArrayList() + val lines: MutableList = ArrayList() var line = fileReader.readLine() while (line != null && lines.size < numLines) { lines.add(line) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt index 8e978239fb6a..219655e420d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt @@ -117,7 +117,7 @@ object JsonSchemas { fun traverseJsonSchemaWithCollector( jsonSchema: JsonNode, mapper: BiFunction?, T> - ): List { + ): List { // for the sake of code reuse, use the filtered collector method but makes sure the filter // always // returns true. @@ -173,7 +173,7 @@ object JsonSchemas { node: JsonNode?, path: List -> if (predicate.test(node)) { - return@traverseJsonSchemaWithFilteredCollector Optional.of?>( + return@traverseJsonSchemaWithFilteredCollector Optional.of>( path ) } else { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt index cb7132c4672f..81998a9cf621 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt @@ -260,7 +260,7 @@ object Jsons { } } - fun navigateTo(node: JsonNode, keys: List): JsonNode { + fun navigateTo(node: JsonNode, keys: List): JsonNode { var targetNode = node for (key in keys) { targetNode = targetNode[key] @@ -346,7 +346,7 @@ object Jsons { @JvmStatic fun flatten(node: JsonNode, applyFlattenToArray: Boolean = false): Map { if (node.isObject) { - val output: MutableMap = HashMap() + val output: MutableMap = HashMap() val it = node.fields() while (it.hasNext()) { val entry = it.next() @@ -354,16 +354,16 @@ object Jsons { val value = entry.value mergeMaps(output, field, flatten(value, applyFlattenToArray)) } - return output + return output.toMap() } else if (node.isArray && applyFlattenToArray) { - val output: MutableMap = HashMap() + val output: MutableMap = HashMap() val arrayLen = node.size() for (i in 0 until arrayLen) { val field = String.format("[%d]", i) val value = node[i] mergeMaps(output, field, flatten(value, applyFlattenToArray)) } - return output + return output.toMap() } else { val value: Any = if (node.isBoolean) { @@ -390,11 +390,7 @@ object Jsons { * If subMap contains a null key, then instead it is replaced with prefix. I.e. {null: value} is * treated as {prefix: value} when merging into originalMap. */ - fun mergeMaps( - originalMap: MutableMap, - prefix: String, - subMap: Map - ) { + fun mergeMaps(originalMap: MutableMap, prefix: String, subMap: Map) { originalMap.putAll( subMap.mapKeys toMap@{ val key = it.key diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/text/Sqls.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/text/Sqls.kt index f0a79783dfb4..fb2cfca87973 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/text/Sqls.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/text/Sqls.kt @@ -6,11 +6,11 @@ package io.airbyte.commons.text import java.util.* object Sqls { - fun ?> toSqlName(value: T): String { - return value!!.name.lowercase(Locale.getDefault()) + fun > toSqlName(value: T): String { + return value.name.lowercase(Locale.getDefault()) } - fun ?> toSqlNames(values: Collection): Set { + fun > toSqlNames(values: Collection): Set { return values.map { toSqlName(it) }.toSet() } @@ -22,7 +22,7 @@ object Sqls { * @param enum type * @return "'value1', 'value2', 'value3'" */ - fun ?> toSqlInFragment(values: Iterable): String { + fun > toSqlInFragment(values: Iterable): String { return values.map { toSqlName(it) }.joinToString(",", "(", ")") { Names.singleQuote(it) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/version/VersionSerializer.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/version/VersionSerializer.kt index c90d270c450b..aa3fd1558972 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/version/VersionSerializer.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/version/VersionSerializer.kt @@ -8,7 +8,7 @@ import com.fasterxml.jackson.databind.SerializerProvider import com.fasterxml.jackson.databind.ser.std.StdSerializer import java.io.IOException -class VersionSerializer @JvmOverloads constructor(t: Class? = null) : +class VersionSerializer @JvmOverloads constructor(t: Class? = null) : StdSerializer(t) { @Throws(IOException::class) override fun serialize(value: Version, gen: JsonGenerator, provider: SerializerProvider) { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/AbstractSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/AbstractSchemaValidator.kt index 8b3d0cfb1d39..195d1b907129 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/AbstractSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/AbstractSchemaValidator.kt @@ -16,7 +16,7 @@ constructor(private val jsonSchemaValidator: JsonSchemaValidator = JsonSchemaVal return JsonSchemaValidator.Companion.getSchema(getSchemaPath(configType).toFile()) } - override fun validate(configType: T, objectJson: JsonNode): Set? { + override fun validate(configType: T, objectJson: JsonNode): Set? { return jsonSchemaValidator.validate(getSchemaJson(configType), objectJson) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/ConfigSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/ConfigSchemaValidator.kt index 68cbb19c61f6..f8cf549c6509 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/ConfigSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/ConfigSchemaValidator.kt @@ -6,7 +6,7 @@ package io.airbyte.validation.json import com.fasterxml.jackson.databind.JsonNode interface ConfigSchemaValidator> { - fun validate(configType: T, objectJson: JsonNode): Set? + fun validate(configType: T, objectJson: JsonNode): Set? fun test(configType: T, objectJson: JsonNode): Boolean diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt index ede1ea15acd9..b2f6b1579924 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/util/CompositeIteratorTest.kt @@ -42,9 +42,9 @@ internal class CompositeIteratorTest { @Test fun testEmptyInput() { - val iterator: AutoCloseableIterator = + val iterator: AutoCloseableIterator = CompositeIterator( - emptyList>(), + emptyList>(), airbyteStreamStatusConsumer ) Assertions.assertFalse(iterator.hasNext()) @@ -54,8 +54,8 @@ internal class CompositeIteratorTest { @Test @Throws(Exception::class) fun testMultipleIterators() { - val iterator: AutoCloseableIterator = - CompositeIterator( + val iterator: AutoCloseableIterator = + CompositeIterator( ImmutableList.of( AutoCloseableIterators.fromIterator( MoreIterators.of("a", "b", "c"), @@ -103,8 +103,8 @@ internal class CompositeIteratorTest { @Test @Throws(Exception::class) fun testWithEmptyIterators() { - val iterator: AutoCloseableIterator = - CompositeIterator( + val iterator: AutoCloseableIterator = + CompositeIterator( ImmutableList.of( AutoCloseableIterators.fromIterator( MoreIterators.of("a", "b", "c"), @@ -141,8 +141,8 @@ internal class CompositeIteratorTest { @Test @Throws(Exception::class) fun testCloseBeforeUsingItUp() { - val iterator: AutoCloseableIterator = - CompositeIterator( + val iterator: AutoCloseableIterator = + CompositeIterator( ImmutableList.of( AutoCloseableIterators.fromIterator( MoreIterators.of("a", "b", "c"), @@ -165,8 +165,8 @@ internal class CompositeIteratorTest { @Test @Throws(Exception::class) fun testCannotOperateAfterClosing() { - val iterator: AutoCloseableIterator = - CompositeIterator( + val iterator: AutoCloseableIterator = + CompositeIterator( ImmutableList.of( AutoCloseableIterators.fromIterator( MoreIterators.of("a", "b", "c"), @@ -187,7 +187,7 @@ internal class CompositeIteratorTest { Mockito.verify(airbyteStreamStatusConsumer, Mockito.times(2)).accept(any()) } - private fun assertNext(iterator: Iterator, value: String) { + private fun assertNext(iterator: Iterator, value: String) { Assertions.assertTrue(iterator.hasNext()) Assertions.assertEquals(value, iterator.next()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/TestHarnessUtilsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/TestHarnessUtilsTest.kt index 12d727bfe884..d5d97e264200 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/TestHarnessUtilsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/TestHarnessUtilsTest.kt @@ -60,7 +60,7 @@ internal class TestHarnessUtilsTest { recordedBeats.incrementAndGet() true } - .`when`(heartbeatMonitor) + .`when`(heartbeatMonitor) .isBeating val thread = Thread { this.runShutdown() } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt index ab3714245989..4fdcbde7e11b 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteDestination.kt @@ -49,7 +49,7 @@ constructor( private var destinationProcess: Process? = null private var writer: AirbyteMessageBufferedWriter? = null - private var messageIterator: Iterator? = null + private var messageIterator: Iterator? = null private var exitValueIsSet = false override val exitValue: Int diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteSource.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteSource.kt index e53c2b7c966f..794b63e60607 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteSource.kt @@ -41,7 +41,7 @@ internal constructor( featureFlags: FeatureFlags ) : AirbyteSource { private var sourceProcess: Process? = null - private var messageIterator: Iterator? = null + private var messageIterator: Iterator? = null private var exitValueIsSet = false @get:Throws(IllegalStateException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.kt index 4ff012f8578d..4b0ec4d4511e 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactory.kt @@ -36,7 +36,7 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { private val protocolValidator: AirbyteProtocolPredicate protected val logger: Logger private val maxMemory: Long - private val exceptionClass: Optional> + private val exceptionClass: Optional> @JvmOverloads constructor( @@ -45,7 +45,7 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { AirbyteProtocolPredicate(), LOGGER, containerLogMdcBuilder, - Optional.empty>() + Optional.empty>() ) /** @@ -58,7 +58,7 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { protocolPredicate: AirbyteProtocolPredicate, logger: Logger, containerLogMdcBuilder: MdcScope.Builder, - messageSizeExceptionClass: Optional> + messageSizeExceptionClass: Optional> ) { protocolValidator = protocolPredicate this.logger = logger @@ -72,7 +72,7 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { protocolPredicate: AirbyteProtocolPredicate, logger: Logger, containerLogMdcBuilder: MdcScope.Builder, - messageSizeExceptionClass: Optional>, + messageSizeExceptionClass: Optional>, maxMemory: Long ) { protocolValidator = protocolPredicate @@ -114,12 +114,12 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { } } .flatMap { line: String -> this.parseJson(line) } - .filter { json: JsonNode? -> this.validate(json) } - .flatMap { json: JsonNode? -> this.toAirbyteMessage(json) } + .filter { json: JsonNode -> this.validate(json) } + .flatMap { json: JsonNode -> this.toAirbyteMessage(json) } .filter { message: AirbyteMessage -> this.filterLog(message) } } - protected fun parseJson(line: String?): Stream { + protected fun parseJson(line: String?): Stream { val jsonLine = Jsons.tryDeserializeWithoutWarn(line) if (jsonLine.isEmpty) { // we log as info all the lines that are not valid json @@ -130,7 +130,7 @@ class DefaultAirbyteStreamFactory : AirbyteStreamFactory { return jsonLine.stream() } - protected fun validate(json: JsonNode?): Boolean { + protected fun validate(json: JsonNode): Boolean { val res = protocolValidator.test(json) if (!res) { logger.error("Validation failed: {}", Jsons.serialize(json)) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/HeartbeatMonitor.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/HeartbeatMonitor.kt index c0f328dd1945..02bb0144a2c7 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/HeartbeatMonitor.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/internal/HeartbeatMonitor.kt @@ -21,7 +21,7 @@ constructor( private val heartBeatFreshDuration: Duration?, private val nowSupplier: Supplier ) { - private val lastBeat = AtomicReference(null) + private val lastBeat = AtomicReference(null) constructor( heartBeatFreshDuration: Duration? diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/AirbyteIntegrationLauncher.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/AirbyteIntegrationLauncher.kt index f429bbc6604e..61516b20ffc7 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/AirbyteIntegrationLauncher.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/AirbyteIntegrationLauncher.kt @@ -109,7 +109,7 @@ class AirbyteIntegrationLauncher( stateFilename: String?, stateContents: String? ): Process? { - val arguments: MutableList = + val arguments: MutableList = Lists.newArrayList("read", CONFIG, configFilename, "--catalog", catalogFilename) val files: MutableMap = HashMap() @@ -145,7 +145,7 @@ class AirbyteIntegrationLauncher( workerMetadata, emptyMap(), emptyMap(), - *arguments.toTypedArray() + *arguments.toTypedArray() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt index af29299515e0..63995d4fb46a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/DockerProcessFactory.kt @@ -64,7 +64,7 @@ class DockerProcessFactory( jobMetadata: Map, internalToExternalPorts: Map?, additionalEnvironmentVariables: Map, - vararg args: String? + vararg args: String ): Process { try { if (!checkImageExists(imageName)) { @@ -79,7 +79,7 @@ class DockerProcessFactory( IOs.writeFile(jobRoot, key, value) } - val cmd: MutableList = + val cmd: MutableList = Lists.newArrayList( "docker", "run", @@ -130,7 +130,7 @@ class DockerProcessFactory( cmd.add("$key=$value") } - if (!Strings.isNullOrEmpty(entrypoint)) { + if (!entrypoint.isNullOrEmpty()) { cmd.add("--entrypoint") cmd.add(entrypoint) } @@ -149,7 +149,7 @@ class DockerProcessFactory( } cmd.add(imageName) - cmd.addAll(Arrays.asList(*args)) + cmd.addAll(args) LOGGER.info("Preparing command: {}", Joiner.on(" ").join(cmd)) @@ -226,7 +226,7 @@ class DockerProcessFactory( * @return A list with debugging arguments or an empty list * ``` */ - fun localDebuggingOptions(containerName: String): List { + fun localDebuggingOptions(containerName: String): List { val shouldAddDebuggerOptions = (Optional.ofNullable(System.getenv("DEBUG_CONTAINER_IMAGE")) .filter { cs: String -> StringUtils.isNotEmpty(cs) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/ProcessFactory.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/ProcessFactory.kt index 0ced27e656e1..e998d798351e 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/ProcessFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/process/ProcessFactory.kt @@ -51,7 +51,7 @@ interface ProcessFactory { jobMetadata: Map, portMapping: Map?, additionalEnvironmentVariables: Map, - vararg args: String? + vararg args: String ): Process companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/test_utils/TestConfigHelpers.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/test_utils/TestConfigHelpers.kt index 1a856f4f75e8..0c640e5aac64 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/test_utils/TestConfigHelpers.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/test_utils/TestConfigHelpers.kt @@ -20,7 +20,7 @@ object TestConfigHelpers { @JvmOverloads fun createSyncConfig( multipleNamespaces: Boolean = false - ): ImmutablePair { + ): ImmutablePair { val workspaceId = UUID.randomUUID() val sourceDefinitionId = UUID.randomUUID() val sourceId = UUID.randomUUID() diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt index 551195b7bbe5..dd6834380c61 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsDestinationConfig.kt @@ -62,7 +62,7 @@ class GcsDestinationConfig( } else -> throw IllegalArgumentException( - "Unsupported credential type: " + gcsCredentialConfig.credentialType!!.name + "Unsupported credential type: " + gcsCredentialConfig.credentialType.name ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt index 1899cb84b52a..1dbb847e2fd5 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/gcs/credential/GcsCredentialConfig.kt @@ -7,6 +7,6 @@ import io.airbyte.cdk.integrations.destination.s3.credential.BlobStorageCredenti import io.airbyte.cdk.integrations.destination.s3.credential.S3CredentialConfig import java.util.* -interface GcsCredentialConfig : BlobStorageCredentialConfig { +interface GcsCredentialConfig : BlobStorageCredentialConfig { val s3CredentialConfig: Optional } diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt index 6b2ec6d0d1cc..5cf12f783693 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.kt @@ -54,7 +54,7 @@ abstract class GcsStreamCopier( GlobalDataSizeConstants.DEFAULT_MAX_BATCH_SIZE_BYTES.toLong() ) private val channels = HashMap() - private val csvPrinters = HashMap() + private val csvPrinters = HashMap() private fun prepareGcsStagingFile(): String { return java.lang.String.join( diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt index 99f76247775f..38ed2b602a28 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/copy/gcs/GcsStreamCopierFactory.kt @@ -18,11 +18,11 @@ import java.io.ByteArrayInputStream import java.io.InputStream import java.nio.charset.StandardCharsets -abstract class GcsStreamCopierFactory : StreamCopierFactory { +abstract class GcsStreamCopierFactory : StreamCopierFactory { /** Used by the copy consumer. */ - fun create( + override fun create( configuredSchema: String?, - gcsConfig: GcsConfig, + config: GcsConfig, stagingFolder: String?, configuredStream: ConfiguredAirbyteStream?, nameTransformer: StandardNameTransformer?, @@ -35,12 +35,12 @@ abstract class GcsStreamCopierFactory : StreamCopierFactory { val schema = getSchema(stream.namespace, configuredSchema!!, nameTransformer!!) val credentialsInputStream: InputStream = - ByteArrayInputStream(gcsConfig.credentialsJson.toByteArray(StandardCharsets.UTF_8)) + ByteArrayInputStream(config.credentialsJson.toByteArray(StandardCharsets.UTF_8)) val credentials = GoogleCredentials.fromStream(credentialsInputStream) val storageClient = StorageOptions.newBuilder() .setCredentials(credentials) - .setProjectId(gcsConfig.projectId) + .setProjectId(config.projectId) .build() .service @@ -51,7 +51,7 @@ abstract class GcsStreamCopierFactory : StreamCopierFactory { stream.name, storageClient, db, - gcsConfig, + config, nameTransformer, sqlOperations ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/S3CsvWriter.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/S3CsvWriter.kt index 64c3b4a6f6a4..19bdd0d2c773 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/S3CsvWriter.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/S3CsvWriter.kt @@ -83,7 +83,7 @@ private constructor( localCsvSettings = @Suppress("deprecation") localCsvSettings.withHeader( - *csvSheetGenerator.getHeaderRow().toTypedArray() + *csvSheetGenerator.getHeaderRow().toTypedArray() ) } this.csvPrinter = diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGeneratorTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGeneratorTest.kt index 9006be50a4fe..8f69a12e4630 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGeneratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGeneratorTest.kt @@ -61,7 +61,7 @@ class RootLevelFlatteningSheetGeneratorTest { private val SCHEMA: ObjectNode = MAPPER.createObjectNode() init { - val fields: List = listOf("C", "B", "A", "c", "b", "a").shuffled() + val fields: List = listOf("C", "B", "A", "c", "b", "a").shuffled() val schemaProperties = MAPPER.createObjectNode() for (field in fields) { diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index dc4c1f5aa5d8..f92cd7712246 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -134,7 +134,7 @@ constructor( else -> throw IllegalArgumentException("Top-level schema must be an object") } - require(!stream.primaryKey.any { key: List -> key.size > 1 }) { + require(!stream.primaryKey.any { key: List -> key.size > 1 }) { "Only top-level primary keys are supported" } val primaryKey = diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index cbb03cdef056..745b086c1553 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -1005,8 +1005,8 @@ abstract class BaseTypingDedupingTest { ): CompletableFuture> { val outputFuture = CompletableFuture>() Executors.newSingleThreadExecutor() - .submit( - Callable { + .submit( + Callable { val destinationMessages: MutableList = ArrayList() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt index cbe3e8968f67..15c4baa9eeb6 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt @@ -132,7 +132,7 @@ constructor( if (airbyteData.isTextual) { airbyteData = Jsons.deserializeExact(airbyteData.asText()) } - Streams.stream(airbyteData.fields()).forEach { field: Map.Entry -> + Streams.stream(airbyteData.fields()).forEach { field: Map.Entry -> if (!copy.has(field.key)) { copy.set(field.key, field.value) } else {