Skip to content

Commit

Permalink
convert java CDK db-destinations submodule to kotlin (#36439)
Browse files Browse the repository at this point in the history
convert java CDK db-destinations submodule to kotlin

fix compiler warnings
  • Loading branch information
stephane-airbyte authored Mar 27, 2024
1 parent f1484bf commit 86517eb
Show file tree
Hide file tree
Showing 129 changed files with 7,881 additions and 6,585 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ abstract class AzureBlobStorageStreamCopier(
}

@Throws(Exception::class)
override fun generateMergeStatement(destTableName: String?): String? {
override fun generateMergeStatement(destTableName: String?): String {
LOGGER.info(
"Preparing to merge tmp table {} to dest table: {}, schema: {}, in destination.",
tmpTableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ interface Destination : Integration {
fun getConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): AirbyteMessageConsumer?

/**
Expand All @@ -53,7 +53,7 @@ interface Destination : Integration {
fun getSerializedMessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): SerializedAirbyteMessageConsumer? {
return ShimToSerializedAirbyteMessageConsumer(
getConsumer(config, catalog, outputRecordCollector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ class IntegrationRunner
@VisibleForTesting
internal constructor(
cliParser: IntegrationCliParser,
outputRecordCollector: Consumer<AirbyteMessage?>,
outputRecordCollector: Consumer<AirbyteMessage>,
destination: Destination?,
source: Source?
) {
private val cliParser: IntegrationCliParser
private val outputRecordCollector: Consumer<AirbyteMessage?>
private val outputRecordCollector: Consumer<AirbyteMessage>
private val integration: Integration
private val destination: Destination?
private val source: Source?
Expand All @@ -61,7 +61,7 @@ internal constructor(
destination: Destination?
) : this(
IntegrationCliParser(),
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
Consumer<AirbyteMessage> { message: AirbyteMessage ->
Destination.Companion.defaultOutputRecordCollector(message)
},
destination,
Expand All @@ -72,7 +72,7 @@ internal constructor(
source: Source?
) : this(
IntegrationCliParser(),
Consumer<AirbyteMessage?> { message: AirbyteMessage? ->
Consumer<AirbyteMessage> { message: AirbyteMessage ->
Destination.Companion.defaultOutputRecordCollector(message)
},
null,
Expand All @@ -99,7 +99,7 @@ internal constructor(
@VisibleForTesting
internal constructor(
cliParser: IntegrationCliParser,
outputRecordCollector: Consumer<AirbyteMessage?>,
outputRecordCollector: Consumer<AirbyteMessage>,
destination: Destination?,
source: Source?,
jsonSchemaValidator: JsonSchemaValidator
Expand Down Expand Up @@ -254,7 +254,7 @@ internal constructor(

private fun produceMessages(
messageIterator: AutoCloseableIterator<AirbyteMessage>,
recordCollector: Consumer<AirbyteMessage?>
recordCollector: Consumer<AirbyteMessage>
) {
messageIterator!!.airbyteStream.ifPresent { s: AirbyteStreamNameNamespacePair? ->
LOGGER.debug("Producing messages for stream {}...", s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
override fun getConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): AirbyteMessageConsumer? {
return destination.getConsumer(config, catalog, outputRecordCollector)
}
Expand All @@ -40,7 +40,7 @@ abstract class SpecModifyingDestination(private val destination: Destination) :
override fun getSerializedMessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): SerializedAirbyteMessageConsumer? {
return destination.getSerializedMessageConsumer(config, catalog, outputRecordCollector)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SshWrappedDestination : Destination {
override fun getConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): AirbyteMessageConsumer? {
val tunnel = getTunnelInstance(config)

Expand All @@ -120,7 +120,7 @@ class SshWrappedDestination : Destination {
override fun getSerializedMessageConsumer(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog,
outputRecordCollector: Consumer<AirbyteMessage?>?
outputRecordCollector: Consumer<AirbyteMessage>
): SerializedAirbyteMessageConsumer? {
val clone = Jsons.clone(config)
val connectionOptionsConfig: Optional<JsonNode> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ interface DestinationFlushFunction {
@Throws(Exception::class)
fun flush(
decs: StreamDescriptor,
stream: Stream<PartialAirbyteMessage?>,
stream: Stream<PartialAirbyteMessage>,
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory
class BufferedStreamConsumer
@VisibleForTesting
internal constructor(
private val outputRecordCollector: Consumer<AirbyteMessage?>?,
private val outputRecordCollector: Consumer<AirbyteMessage>,
private val onStart: OnStartFunction,
private val bufferingStrategy: BufferingStrategy,
private val onClose: OnCloseFunction,
Expand All @@ -87,7 +87,7 @@ internal constructor(
*/
@Deprecated("")
constructor(
outputRecordCollector: Consumer<AirbyteMessage?>?,
outputRecordCollector: Consumer<AirbyteMessage>,
onStart: OnStartFunction,
bufferingStrategy: BufferingStrategy,
onClose: OnCloseFunction,
Expand All @@ -109,7 +109,7 @@ internal constructor(
)

constructor(
outputRecordCollector: Consumer<AirbyteMessage?>?,
outputRecordCollector: Consumer<AirbyteMessage>,
onStart: OnStartFunction,
bufferingStrategy: BufferingStrategy,
onClose: OnCloseFunction,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
import io.airbyte.commons.functional.CheckedBiConsumer
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair

interface RecordWriter<T> : CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
fun interface RecordWriter<T> :
CheckedBiConsumer<AirbyteStreamNameNamespacePair, List<T>, Exception> {
@Throws(Exception::class)
override fun accept(stream: AirbyteStreamNameNamespacePair, records: List<T>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ interface SqlOperations {
* @throws Exception exception
*/
@Throws(Exception::class)
fun createTableIfNotExists(database: JdbcDatabase?, schemaName: String?, tableName: String?)
fun createTableIfNotExists(database: JdbcDatabase, schemaName: String?, tableName: String?)

/**
* Query to create a table with provided name in provided schema if it does not already exist.
Expand All @@ -72,7 +72,7 @@ interface SqlOperations {
* @throws Exception exception
*/
@Throws(Exception::class)
fun dropTableIfExists(database: JdbcDatabase?, schemaName: String?, tableName: String?)
fun dropTableIfExists(database: JdbcDatabase, schemaName: String?, tableName: String?)

/**
* Query to remove all records from a table. Assumes the table exists.
Expand All @@ -82,11 +82,7 @@ interface SqlOperations {
* @param tableName Name of table
* @return Query
*/
fun truncateTableQuery(
database: JdbcDatabase?,
schemaName: String?,
tableName: String?
): String?
fun truncateTableQuery(database: JdbcDatabase?, schemaName: String?, tableName: String?): String

/**
* Insert records into table. Assumes the table exists.
Expand All @@ -99,8 +95,8 @@ interface SqlOperations {
*/
@Throws(Exception::class)
fun insertRecords(
database: JdbcDatabase?,
records: List<PartialAirbyteMessage?>?,
database: JdbcDatabase,
records: List<PartialAirbyteMessage>,
schemaName: String?,
tableName: String?
)
Expand Down Expand Up @@ -131,8 +127,7 @@ interface SqlOperations {
* @param queries Queries to execute
* @throws Exception exception
*/
@Throws(Exception::class)
fun executeTransaction(database: JdbcDatabase?, queries: List<String?>?)
@Throws(Exception::class) fun executeTransaction(database: JdbcDatabase, queries: List<String>)

/** Check if the data record is valid and ok to be written to destination */
fun isValidData(data: JsonNode?): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ interface StreamCopier {
@Throws(Exception::class) fun createDestinationTable(): String?

/** Generates a merge SQL statement from the temporary table to the final table. */
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String?
@Throws(Exception::class) fun generateMergeStatement(destTableName: String?): String

/**
* Cleans up the copier by removing the staging file and dropping the temporary table after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ import org.slf4j.LoggerFactory
* This should be deprecated as we slowly move towards using [SerializedBufferingStrategy] instead.
*/
class InMemoryRecordBufferingStrategy(
private val recordWriter: RecordWriter<AirbyteRecordMessage?>,
private val recordWriter: RecordWriter<AirbyteRecordMessage>,
private val checkAndRemoveRecordWriter: CheckAndRemoveRecordWriter?,
private val maxQueueSizeInBytes: Long
) : BufferingStrategy {
private var streamBuffer:
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage?>> =
MutableMap<AirbyteStreamNameNamespacePair, MutableList<AirbyteRecordMessage>> =
HashMap()
private var fileName: String? = null

private val recordSizeEstimator = RecordSizeEstimator()
private var bufferSizeInBytes: Long = 0

constructor(
recordWriter: RecordWriter<AirbyteRecordMessage?>,
recordWriter: RecordWriter<AirbyteRecordMessage>,
maxQueueSizeInBytes: Long
) : this(recordWriter, null, maxQueueSizeInBytes)

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.11
version=0.28.12
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory

internal class IntegrationRunnerTest {
private lateinit var cliParser: IntegrationCliParser
private lateinit var stdoutConsumer: Consumer<AirbyteMessage?>
private lateinit var stdoutConsumer: Consumer<AirbyteMessage>
private lateinit var destination: Destination
private lateinit var source: Source
private lateinit var configPath: Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ class AsyncStreamConsumerTest {
namespace: String,
allRecords: List<AirbyteMessage>,
) {
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage?>>()
val argumentCaptor = org.mockito.kotlin.argumentCaptor<Stream<PartialAirbyteMessage>>()
Mockito.verify(flushFunction, Mockito.atLeast(1))
.flush(
org.mockito.kotlin.eq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import org.mockito.kotlin.mock
class BufferedStreamConsumerTest {
private lateinit var consumer: BufferedStreamConsumer
private lateinit var onStart: OnStartFunction
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage?>
private lateinit var recordWriter: RecordWriter<AirbyteRecordMessage>
private lateinit var onClose: OnCloseFunction
private lateinit var isValidRecord: CheckedFunction<JsonNode?, Boolean?, Exception?>
private lateinit var outputRecordCollector: Consumer<AirbyteMessage?>
private lateinit var outputRecordCollector: Consumer<AirbyteMessage>

@BeforeEach
@Throws(Exception::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.mockito.Mockito
import org.mockito.kotlin.mock

class InMemoryRecordBufferingStrategyTest {
private val recordWriter: RecordWriter<AirbyteRecordMessage?> = mock()
private val recordWriter: RecordWriter<AirbyteRecordMessage> = mock()

@Test
@Throws(Exception::class)
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-destinations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ java {
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false

dependencies {
api 'org.apache.commons:commons-csv:1.10.0'

Expand All @@ -27,4 +31,6 @@ dependencies {
testFixturesImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:typing-deduping'))

testImplementation project(':airbyte-cdk:java:airbyte-cdk:typing-deduping')
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'

}
Loading

0 comments on commit 86517eb

Please sign in to comment.