diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt index 1d95b235a8f5..2ecf74e0fd94 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DSLContextFactory.kt @@ -67,7 +67,7 @@ object DSLContextFactory { driverClassName: String, jdbcConnectionString: String?, dialect: SQLDialect?, - connectionProperties: Map?, + connectionProperties: Map?, connectionTimeout: Duration? ): DSLContext { return DSL.using( diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt index 507a4f366bdb..0b3625d18dd2 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DataSourceFactory.kt @@ -50,7 +50,7 @@ object DataSourceFactory { password: String?, driverClassName: String, jdbcConnectionString: String?, - connectionProperties: Map?, + connectionProperties: Map?, connectionTimeout: Duration? ): DataSource { return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString) @@ -100,7 +100,7 @@ object DataSourceFactory { port: Int, database: String?, driverClassName: String, - connectionProperties: Map? + connectionProperties: Map? ): DataSource { return DataSourceBuilder(username, password, driverClassName, host, port, database) .withConnectionProperties(connectionProperties) @@ -152,7 +152,7 @@ object DataSourceFactory { private var password: String?, private var driverClassName: String ) { - private var connectionProperties: Map = java.util.Map.of() + private var connectionProperties: Map = java.util.Map.of() private var database: String? = null private var host: String? = null private var jdbcUrl: String? = null @@ -185,7 +185,7 @@ object DataSourceFactory { } fun withConnectionProperties( - connectionProperties: Map? + connectionProperties: Map? ): DataSourceBuilder { if (connectionProperties != null) { this.connectionProperties = connectionProperties diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt index d09681f9f44b..4daca32dd778 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcSourceOperations.kt @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory /** Implementation of source operations with standard JDBC types. */ class JdbcSourceOperations : - AbstractJdbcCompatibleSourceOperations(), SourceOperations { + AbstractJdbcCompatibleSourceOperations(), SourceOperations { protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType { return try { JDBCType.valueOf(columnTypeInt) @@ -147,7 +147,7 @@ class JdbcSourceOperations : return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type) } - override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType { + override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType { return when (jdbcType) { JDBCType.BIT, JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt index 297925119c87..e8ff27ccad66 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt @@ -11,7 +11,7 @@ import java.util.* abstract class JdbcConnector protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() { - protected fun getConnectionTimeout(connectionProperties: Map): Duration { + protected fun getConnectionTimeout(connectionProperties: Map): Duration { return getConnectionTimeout(connectionProperties, driverClassName) } @@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon * @return DataSourceBuilder class used to create dynamic fields for DataSource */ fun getConnectionTimeout( - connectionProperties: Map, + connectionProperties: Map, driverClassName: String? ): Duration { val parsedConnectionTimeout = diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle b/airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle index 5ac716385f16..9c779a225b35 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle @@ -11,6 +11,11 @@ java { } } +compileKotlin.compilerOptions.allWarningsAsErrors = false +compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false +compileTestKotlin.compilerOptions.allWarningsAsErrors = false + + // Convert yaml to java: relationaldb.models jsonSchema2Pojo { sourceType = SourceType.YAMLSCHEMA diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt index 24e7eebb2aa3..677f3ee16c3f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt @@ -55,10 +55,6 @@ class AirbyteDebeziumHandler(private val config: JsonNode, reportQueueUtilization() return super.poll() } - - companion object { - private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS) - } } fun getIncrementalIterators(debeziumPropertiesManager: DebeziumPropertiesManager, @@ -70,12 +66,12 @@ class AirbyteDebeziumHandler(private val config: JsonNode, val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore.Companion.initializeState( cdcSavedInfoFetcher.savedOffset, if (addDbNameToOffsetState) Optional.ofNullable(config[JdbcUtils.DATABASE_KEY].asText()) else Optional.empty()) - val schemaHistoryManager: Optional = if (trackSchemaHistory + val schemaHistoryManager: Optional = if (trackSchemaHistory ) Optional.of(AirbyteSchemaHistoryStorage.Companion.initializeDBHistory( cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState())) else Optional.empty() val publisher = DebeziumRecordPublisher(debeziumPropertiesManager) - val queue: CapacityReportingBlockingQueue> = CapacityReportingBlockingQueue>(queueSize) + val queue: CapacityReportingBlockingQueue> = CapacityReportingBlockingQueue(queueSize) publisher.start(queue, offsetManager, schemaHistoryManager) // handle state machine around pub/sub logic. val eventIterator: AutoCloseableIterator = DebeziumRecordIterator( @@ -106,6 +102,7 @@ class AirbyteDebeziumHandler(private val config: JsonNode, companion object { private val LOGGER: Logger = LoggerFactory.getLogger(AirbyteDebeziumHandler::class.java) + private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS) /** * We use 10000 as capacity cause the default queue size and batch size of debezium is : diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt index 80c025bba587..abcc9e591539 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcSavedInfoFetcher.kt @@ -14,5 +14,5 @@ import java.util.* interface CdcSavedInfoFetcher { val savedOffset: JsonNode? - val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory?>? + val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory>? } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt index 2c48d0b8dd1e..6c6ac26ebdf3 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcStateHandler.kt @@ -11,7 +11,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage * which suits them. Also, it adds some utils to verify CDC event status. */ interface CdcStateHandler { - fun saveState(offset: Map?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory?): AirbyteMessage? + fun saveState(offset: Map?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory?): AirbyteMessage? fun saveStateAfterCompletionOfSnapshotOfNewStreams(): AirbyteMessage? diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index 275f59fe368b..22eaa1563d55 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -42,8 +42,8 @@ class AirbyteFileOffsetBackingStore(private val offsetFilePath: Path, private va } fun persist(cdcState: JsonNode?) { - val mapAsString: Map = - if (cdcState != null) Jsons.`object`>(cdcState, MutableMap::class.java) else emptyMap() + val mapAsString: Map = + if (cdcState != null) Jsons.`object`(cdcState, MutableMap::class.java) as Map else emptyMap() val updatedMap = updateStateForDebezium2_1(mapAsString) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt index 537a4c2c837f..d9801f81cbbb 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteSchemaHistoryStorage.kt @@ -37,17 +37,6 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc val isCompressed: Boolean init { - this.streamName = streamName - this.primaryKey = primaryKey - this.keySequence = keySequence - this.syncCheckpointRecords = syncCheckpointRecords - this.syncCheckpointDuration = syncCheckpointDuration - this.tableName = tableName - this.cursorColumnName = cursorColumnName - this.cursorSqlType = cursorSqlType - this.cause = cause - this.tableSize = tableSize - this.avgRowLength = avgRowLength this.schema = schema this.isCompressed = isCompressed } @@ -140,7 +129,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc } } - private fun persist(schemaHistory: SchemaHistory?>?) { + private fun persist(schemaHistory: SchemaHistory>?) { if (schemaHistory!!.schema!!.isEmpty) { return } @@ -223,7 +212,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB) } - fun initializeDBHistory(schemaHistory: SchemaHistory?>?, + fun initializeDBHistory(schemaHistory: SchemaHistory>?, compressSchemaHistoryForState: Boolean): AirbyteSchemaHistoryStorage { val dbHistoryWorkingDir: Path try { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt index 7045e7020f49..886e5f400322 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumPropertiesManager.kt @@ -17,7 +17,7 @@ abstract class DebeziumPropertiesManager(private val properties: Properties, fun getDebeziumProperties( offsetManager: AirbyteFileOffsetBackingStore, - schemaHistoryManager: Optional): Properties { + schemaHistoryManager: Optional): Properties { val props = Properties() props.putAll(properties) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt index 08614d0747f4..e69fe5f16c01 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt @@ -35,14 +35,14 @@ class DebeziumRecordIterator(private val queue: LinkedBlockingQueue, private val debeziumShutdownProcedure: DebeziumShutdownProcedure>, private val firstRecordWaitTime: Duration, - subsequentRecordWaitTime: Duration?) : AbstractIterator(), AutoCloseableIterator { + subsequentRecordWaitTime: Duration?) : AbstractIterator(), AutoCloseableIterator { private val heartbeatEventSourceField: MutableMap?>, Field?> = HashMap(1) private val subsequentRecordWaitTime: Duration = firstRecordWaitTime.dividedBy(2) private var receivedFirstRecord = false private var hasSnapshotFinished = true private var tsLastHeartbeat: LocalDateTime? = null - private var lastHeartbeatPosition: T = null + private var lastHeartbeatPosition: T? = null private var maxInstanceOfNoRecordsFound = 0 private var signalledDebeziumEngineShutdown = false @@ -108,7 +108,6 @@ class DebeziumRecordIterator(private val queue: LinkedBlockingQueue(private val queue: LinkedBlockingQueue): T? { + internal fun getHeartbeatPosition(heartbeatEvent: ChangeEvent): T { try { val eventClass: Class?> = heartbeatEvent.javaClass val f: Field? diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt index d382bec64a32..db7ab16ad604 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordPublisher.kt @@ -28,7 +28,7 @@ class DebeziumRecordPublisher(private val debeziumPropertiesManager: DebeziumPro fun start(queue: BlockingQueue>, offsetManager: AirbyteFileOffsetBackingStore, - schemaHistoryManager: Optional) { + schemaHistoryManager: Optional) { engine = DebeziumEngine.create(Json::class.java) .using(debeziumPropertiesManager.getDebeziumProperties(offsetManager, schemaHistoryManager)) .using(OffsetCommitPolicy.AlwaysCommitOffsetPolicy()) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index 9ed5df111d04..0144d08d8f29 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -13,6 +13,24 @@ import io.airbyte.cdk.db.JdbcCompatibleSourceOperations import io.airbyte.cdk.db.SqlDatabase import io.airbyte.cdk.db.factory.DataSourceFactory.close import io.airbyte.cdk.db.factory.DataSourceFactory.create +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_SIZE +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_DECIMAL_DIGITS +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_IS_NULLABLE +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_COLUMN_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_DATABASE_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_DATA_TYPE +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_SCHEMA_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_SIZE +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TABLE_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_DECIMAL_DIGITS +import io.airbyte.cdk.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE +import io.airbyte.cdk.db.jdbc.JdbcConstants.KEY_SEQ import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.db.jdbc.JdbcUtils import io.airbyte.cdk.db.jdbc.JdbcUtils.getFullyQualifiedTableName @@ -60,7 +78,7 @@ import javax.sql.DataSource abstract class AbstractJdbcSource(driverClass: String?, protected val streamingQueryConfigProvider: Supplier, sourceOperations: JdbcCompatibleSourceOperations) : AbstractDbSource(driverClass), Source { - protected val sourceOperations: JdbcCompatibleSourceOperations + protected val sourceOperations: JdbcCompatibleSourceOperations override var quoteString: String? = null protected var dataSources: MutableCollection = ArrayList() @@ -74,7 +92,7 @@ abstract class AbstractJdbcSource(driverClass: String?, schemaName: String?, tableName: String, syncMode: SyncMode, - cursorField: Optional): AutoCloseableIterator? { + cursorField: Optional): AutoCloseableIterator? { LOGGER.info("Queueing query for table: {}", tableName) // This corresponds to the initial sync for in INCREMENTAL_MODE, where the ordering of the records // matters @@ -127,10 +145,10 @@ abstract class AbstractJdbcSource(driverClass: String?, .values .stream() .map>> { fields: List -> - TableInfo.builder>() - .nameSpace(fields[0].get(INTERNAL_SCHEMA_NAME).asText()) - .name(fields[0].get(INTERNAL_TABLE_NAME).asText()) - .fields(fields.stream() // read the column metadata Json object, and determine its type + TableInfo>( + nameSpace = fields[0].get(INTERNAL_SCHEMA_NAME).asText(), + name = fields[0].get(INTERNAL_TABLE_NAME).asText(), + fields = fields.stream() // read the column metadata Json object, and determine its type .map { f: JsonNode -> val datatype = sourceOperations.getDatabaseFieldType(f) val jsonType = getAirbyteType(datatype) @@ -143,9 +161,8 @@ abstract class AbstractJdbcSource(driverClass: String?, jsonType) object : CommonField(f.get(INTERNAL_COLUMN_NAME).asText(), datatype) {} } - .collect(Collectors.toList>())) - .cursorFields(extractCursorFields(fields)) - .build() + .collect(Collectors.toList>()), + cursorFields = extractCursorFields(fields)) } .collect(Collectors.toList>>()) } @@ -158,15 +175,15 @@ abstract class AbstractJdbcSource(driverClass: String?, } protected fun excludeNotAccessibleTables(internalSchemas: Set, - tablesWithSelectGrantPrivilege: Set?): Predicate { + tablesWithSelectGrantPrivilege: Set?): Predicate { return Predicate { jsonNode: JsonNode -> if (tablesWithSelectGrantPrivilege!!.isEmpty()) { return@Predicate isNotInternalSchema(jsonNode, internalSchemas) } (tablesWithSelectGrantPrivilege.stream() - .anyMatch { e: JdbcPrivilegeDto? -> e.getSchemaName() == jsonNode.get(INTERNAL_SCHEMA_NAME).asText() } + .anyMatch { e: JdbcPrivilegeDto -> e.schemaName == jsonNode.get(INTERNAL_SCHEMA_NAME).asText() } && tablesWithSelectGrantPrivilege.stream() - .anyMatch { e: JdbcPrivilegeDto? -> e.getTableName() == jsonNode.get(INTERNAL_TABLE_NAME).asText() } + .anyMatch { e: JdbcPrivilegeDto -> e.tableName == jsonNode.get(INTERNAL_TABLE_NAME).asText() } && !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText())) } } @@ -240,16 +257,16 @@ abstract class AbstractJdbcSource(driverClass: String?, return tableInfos.stream() .collect(Collectors.toMap>, String, MutableList>( Function>, String> { tableInfo: TableInfo> -> getFullyQualifiedTableName(tableInfo.nameSpace, tableInfo.name) }, - Function>, MutableList> { tableInfo: TableInfo> -> + Function>, MutableList> toMap@{ tableInfo: TableInfo> -> val streamName = getFullyQualifiedTableName(tableInfo.nameSpace, tableInfo.name) try { val primaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( { connection: Connection -> connection.metaData.getPrimaryKeys(getCatalog(database), tableInfo.nameSpace, tableInfo.name) }, { r: ResultSet -> PrimaryKeyAttributesFromDb(streamName, r.getString(JDBC_COLUMN_COLUMN_NAME), r.getInt(KEY_SEQ)) })) - return@toMap primaryKeys.getOrDefault(streamName, emptyList()) + return@toMap primaryKeys.getOrDefault(streamName, mutableListOf()) } catch (e: SQLException) { LOGGER.error(String.format("Could not retrieve primary keys for %s: %s", streamName, e)) - return@toMap emptyList() + return@toMap mutableListOf() } })) } @@ -263,14 +280,14 @@ abstract class AbstractJdbcSource(driverClass: String?, schemaName: String?, tableName: String, cursorInfo: CursorInfo, - cursorFieldType: Datatype): AutoCloseableIterator? { + cursorFieldType: Datatype): AutoCloseableIterator? { LOGGER.info("Queueing query for table: {}", tableName) val airbyteStream = AirbyteStreamUtils.convertFromNameAndNamespace(tableName, schemaName) return AutoCloseableIterators.lazyIterator({ try { val stream = database.unsafeQuery( - CheckedFunction { connection: Connection -> + CheckedFunction { connection: Connection -> LOGGER.info("Preparing query for table: {}", tableName) val fullTableName = RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting(schemaName, tableName, quoteString) val quotedCursorField = RelationalDbQueryUtils.enquoteIdentifier(cursorInfo.cursorField, quoteString) @@ -306,7 +323,7 @@ abstract class AbstractJdbcSource(driverClass: String?, sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.cursor) preparedStatement }, - CheckedFunction { queryResult: ResultSet? -> sourceOperations.rowToJson(queryResult!!) }) + CheckedFunction { queryResult: ResultSet? -> sourceOperations.rowToJson(queryResult!!) }) return@lazyIterator AutoCloseableIterators.fromStream(stream, airbyteStream) } catch (e: SQLException) { throw RuntimeException(e) @@ -418,8 +435,7 @@ abstract class AbstractJdbcSource(driverClass: String?, protected fun identifyStreamsToSnapshot(catalog: ConfiguredAirbyteCatalog, stateManager: StateManager): List { val alreadySyncedStreams = stateManager.cdcStateManager.initialStreamsSynced - if (alreadySyncedStreams!!.isEmpty() && (stateManager.cdcStateManager.cdcState == null - || stateManager.cdcStateManager.cdcState.state == null)) { + if (alreadySyncedStreams!!.isEmpty() && (stateManager.cdcStateManager.cdcState?.state == null)) { return emptyList() } @@ -430,7 +446,7 @@ abstract class AbstractJdbcSource(driverClass: String?, return catalog.streams.stream() .filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL } .filter { stream: ConfiguredAirbyteStream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream)) } - .map { `object`: ConfiguredAirbyteStream? -> Jsons.clone(`object`) } + .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } .collect(Collectors.toList()) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSource.kt index b08c28c8e12c..135246cb6610 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSource.kt @@ -14,7 +14,7 @@ import org.slf4j.LoggerFactory import java.sql.JDBCType import java.util.function.Supplier -class JdbcSource : AbstractJdbcSource(DatabaseDriver.POSTGRESQL.driverClassName, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { +class JdbcSource : AbstractJdbcSource(DatabaseDriver.POSTGRESQL.driverClassName, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { // no-op for JdbcSource since the config it receives is designed to be use for JDBC. override fun toDatabaseConfig(config: JsonNode): JsonNode { return config diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 7bb9dd92dc12..a31f1fffee02 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.db.IncrementalUtils.getCursorFieldOptional import io.airbyte.cdk.db.IncrementalUtils.getCursorType import io.airbyte.cdk.db.jdbc.JdbcDatabase import io.airbyte.cdk.integrations.JdbcConnector +import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility.emitConfigErrorTrace import io.airbyte.cdk.integrations.base.Source import io.airbyte.cdk.integrations.base.errors.messages.ErrorMessage.getErrorMessage @@ -52,11 +53,6 @@ abstract class AbstractDbSource protecte // TODO: Remove when the flag is not use anymore protected var featureFlags: FeatureFlags = EnvVariableFeatureFlags() - @VisibleForTesting - fun setFeatureFlags(featureFlags: FeatureFlags) { - this.featureFlags = featureFlags - } - @Trace(operationName = CHECK_TRACE_OPERATION_NAME) @Throws(Exception::class) override fun check(config: JsonNode): AirbyteConnectionStatus? { @@ -143,11 +139,12 @@ abstract class AbstractDbSource protecte emittedAt) val iteratorList = Stream .of(incrementalIterators, fullRefreshIterators) - .flatMap(Function>, Stream>> { obj: List> -> obj.stream() }) + .flatMap(Collection>::stream) .collect(Collectors.toList()) return AutoCloseableIterators - .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList) { obj: AirbyteStreamStatusHolder -> obj.emitStreamStatusTrace() }) { + .appendOnClose(AutoCloseableIterators.concatWithEagerClose(iteratorList, AirbyteTraceMessageUtility::emitStreamStatusTrace)) + { LOGGER.info("Closing database connection pool.") Exceptions.toRuntime { this.close() } LOGGER.info("Closed database connection pool.") @@ -176,7 +173,7 @@ abstract class AbstractDbSource protecte if (cursorField.isEmpty) { continue } - val cursorType = table.fields.stream() + val cursorType = table.fields!!.stream() .filter { info: CommonField -> info.name == cursorField.get() } .map { obj: CommonField -> obj.type } .findFirst() @@ -242,7 +239,7 @@ abstract class AbstractDbSource protecte catalog: ConfiguredAirbyteCatalog?, tableNameToTable: Map>>, stateManager: StateManager?, - emittedAt: Instant): List> { + emittedAt: Instant): List> { return getSelectedIterators( database, catalog, @@ -257,7 +254,7 @@ abstract class AbstractDbSource protecte catalog: ConfiguredAirbyteCatalog?, tableNameToTable: Map>>, stateManager: StateManager?, - emittedAt: Instant): List> { + emittedAt: Instant): List> { return getSelectedIterators( database, catalog, @@ -284,8 +281,8 @@ abstract class AbstractDbSource protecte tableNameToTable: Map>>, stateManager: StateManager?, emittedAt: Instant, - syncMode: SyncMode): List> { - val iteratorList: MutableList> = ArrayList() + syncMode: SyncMode): List> { + val iteratorList: MutableList> = ArrayList() for (airbyteStream in catalog!!.streams) { if (airbyteStream.syncMode == syncMode) { val stream = airbyteStream.stream @@ -325,7 +322,7 @@ abstract class AbstractDbSource protecte airbyteStream: ConfiguredAirbyteStream, table: TableInfo>, stateManager: StateManager?, - emittedAt: Instant): AutoCloseableIterator { + emittedAt: Instant): AutoCloseableIterator { val streamName = airbyteStream.stream.name val namespace = airbyteStream.stream.namespace val pair = io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair(streamName, @@ -345,7 +342,7 @@ abstract class AbstractDbSource protecte val cursorInfo = stateManager!!.getCursorInfo(pair) val airbyteMessageIterator: AutoCloseableIterator - if (cursorInfo!!.map { obj: CursorInfo? -> obj.getCursor() }.isPresent) { + if (cursorInfo!!.map { it.cursor }.isPresent) { airbyteMessageIterator = getIncrementalStream( database, airbyteStream, @@ -365,11 +362,11 @@ abstract class AbstractDbSource protecte val messageProducer = CursorStateMessageProducer( stateManager, - cursorInfo.map { obj: CursorInfo? -> obj.getCursor() }) + cursorInfo.map { it.cursor }) iterator = AutoCloseableIterators.transform( - { autoCloseableIterator: AutoCloseableIterator -> - SourceStateIterator(autoCloseableIterator, airbyteStream, messageProducer, + { autoCloseableIterator: AutoCloseableIterator -> + SourceStateIterator(autoCloseableIterator, airbyteStream, messageProducer, StateEmitFrequency(stateEmissionFrequency.toLong(), Duration.ZERO)) }, @@ -455,7 +452,7 @@ abstract class AbstractDbSource protecte table: TableInfo>, emittedAt: Instant, syncMode: SyncMode, - cursorField: Optional): AutoCloseableIterator { + cursorField: Optional): AutoCloseableIterator { val queryStream = queryTableFullRefresh(database, selectedDatabaseFields, table.nameSpace, table.name, syncMode, cursorField) @@ -600,7 +597,7 @@ abstract class AbstractDbSource protecte schemaName: String?, tableName: String, syncMode: SyncMode, - cursorField: Optional): AutoCloseableIterator? + cursorField: Optional): AutoCloseableIterator? /** * Read incremental data from a table. Incremental read should return only records where cursor @@ -615,7 +612,7 @@ abstract class AbstractDbSource protecte schemaName: String?, tableName: String, cursorInfo: CursorInfo, - cursorFieldType: DataType): AutoCloseableIterator? + cursorFieldType: DataType): AutoCloseableIterator? protected val stateEmissionFrequency: Int /** @@ -650,7 +647,7 @@ abstract class AbstractDbSource protecte private val LOGGER: Logger = LoggerFactory.getLogger(AbstractDbSource::class.java) private fun getMessageIterator( - recordIterator: AutoCloseableIterator?, + recordIterator: AutoCloseableIterator?, streamName: String, namespace: String, emittedAt: Long): AutoCloseableIterator { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/CdcStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/CdcStateManager.kt index 2a482c73fde0..9e1465215d8d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/CdcStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/CdcStateManager.kt @@ -12,16 +12,15 @@ import org.slf4j.LoggerFactory import java.util.* class CdcStateManager(private val initialState: CdcState?, - initialStreamsSynced: Set?, + initialStreamsSynced: Set?, stateMessage: AirbyteStateMessage?) { - private val initialStreamsSynced: Set? + val initialStreamsSynced: Set? val rawStateMessage: AirbyteStateMessage? private var currentState: CdcState? init { this.currentState = initialState - this.initialStreamsSynced = initialStreamsSynced - + this.initialStreamsSynced = if (initialStreamsSynced != null) Collections.unmodifiableSet(initialStreamsSynced) else null this.rawStateMessage = stateMessage LOGGER.info("Initialized CDC state") } @@ -32,10 +31,6 @@ class CdcStateManager(private val initialState: CdcState?, this.currentState = state } - fun getInitialStreamsSynced(): Set? { - return if (initialStreamsSynced != null) Collections.unmodifiableSet(initialStreamsSynced) else null - } - override fun toString(): String { return "CdcStateManager{" + "initialState=" + initialState + diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index ac542b4eec3d..795f2a66559e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -94,10 +94,9 @@ object DbSourceDiscoverUtil { t.name) val primaryKeys = fullyQualifiedTableNameToPrimaryKeys.getOrDefault( fullyQualifiedTableName, emptyList()) - TableInfo.builder().nameSpace(t.nameSpace).name(t.name) - .fields(fields).primaryKeys(primaryKeys) - .cursorFields(t.cursorFields) - .build() + TableInfo(nameSpace = t.nameSpace, name = t.name, + fields = fields, primaryKeys = primaryKeys, + cursorFields = t.cursorFields) } .collect(Collectors.toList()) @@ -105,8 +104,8 @@ object DbSourceDiscoverUtil { .map { tableInfo: TableInfo -> val primaryKeys = tableInfo.primaryKeys.stream() .filter { obj: String? -> Objects.nonNull(obj) } - .map(Function> { o: String? -> listOf(o) }) - .collect(Collectors.toList()) + .map{ listOf(it) } + .toList() CatalogHelpers .createAirbyteStream(tableInfo.name, tableInfo.nameSpace, tableInfo.fields) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InvalidCursorInfoUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InvalidCursorInfoUtil.kt index 5caab6e86471..74fc8323ef80 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InvalidCursorInfoUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/InvalidCursorInfoUtil.kt @@ -28,11 +28,6 @@ object InvalidCursorInfoUtil { val cause: String init { - this.streamName = streamName - this.primaryKey = primaryKey - this.keySequence = keySequence - this.syncCheckpointRecords = syncCheckpointRecords - this.syncCheckpointDuration = syncCheckpointDuration this.tableName = tableName this.cursorColumnName = cursorColumnName this.cursorSqlType = cursorSqlType diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt index 31e46ad3ba57..fdc5c76079c0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt @@ -94,15 +94,6 @@ object RelationalDbQueryUtils { val avgRowLength: Long init { - this.streamName = streamName - this.primaryKey = primaryKey - this.keySequence = keySequence - this.syncCheckpointRecords = syncCheckpointRecords - this.syncCheckpointDuration = syncCheckpointDuration - this.tableName = tableName - this.cursorColumnName = cursorColumnName - this.cursorSqlType = cursorSqlType - this.cause = cause this.tableSize = tableSize this.avgRowLength = avgRowLength } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt index f5eac21e370a..6ed73c6bfe2e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt @@ -19,7 +19,7 @@ object RelationalDbReadUtil { return catalog.streams.stream() .filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL } .filter { stream: ConfiguredAirbyteStream -> newlyAddedStreams.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream)) } - .map { `object`: ConfiguredAirbyteStream? -> Jsons.clone(`object`) } + .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } .collect(Collectors.toList()) } @@ -32,7 +32,7 @@ object RelationalDbReadUtil { return catalog.streams.stream() .filter { c: ConfiguredAirbyteStream -> c.syncMode == SyncMode.INCREMENTAL } .filter { stream: ConfiguredAirbyteStream -> !initialLoadStreamsNamespacePairs.contains(AirbyteStreamNameNamespacePair.fromAirbyteStream(stream.stream)) } - .map { `object`: ConfiguredAirbyteStream? -> Jsons.clone(`object`) } + .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } .collect(Collectors.toList()) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt index 7a0db7d92413..d955570961d4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/StateDecoratingIterator.kt @@ -158,7 +158,7 @@ class StateDecoratingIterator(private val messageIterator: Iterator + protected val intermediateMessage: Optional /** * Returns AirbyteStateMessage when in a ready state, a ready state means that it has satifies the * conditions of: @@ -172,8 +172,8 @@ class StateDecoratingIterator(private val messageIterator: Iterator latest: {} = {} (count {})", pair, - cursorInfo!!.map { obj: CursorInfo? -> obj.getOriginalCursorField() }.orElse(null), - cursorInfo.map { obj: CursorInfo? -> obj.getOriginalCursor() }.orElse(null), - cursorInfo.map { obj: CursorInfo? -> obj.getOriginalCursorRecordCount() }.orElse(null), - cursorInfo.map { obj: CursorInfo? -> obj.getCursorField() }.orElse(null), - cursorInfo.map { obj: CursorInfo? -> obj.getCursor() }.orElse(null), - cursorInfo.map { obj: CursorInfo? -> obj.getCursorRecordCount() }.orElse(null)) + cursorInfo.map { obj: CursorInfo -> obj.originalCursorField }.orElse(null), + cursorInfo.map { obj: CursorInfo -> obj.originalCursor }.orElse(null), + cursorInfo.map { obj: CursorInfo -> obj.originalCursorRecordCount }.orElse(null), + cursorInfo.map { obj: CursorInfo -> obj.cursorField }.orElse(null), + cursorInfo.map { obj: CursorInfo -> obj.cursor }.orElse(null), + cursorInfo.map { obj: CursorInfo -> obj.cursorRecordCount }.orElse(null)) } stateMessage?.withSourceStats(AirbyteStateStats().withRecordCount(recordCount.toDouble())) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/TableInfo.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/TableInfo.kt index 8d900fd477b2..9344aedceea2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/TableInfo.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/TableInfo.kt @@ -9,12 +9,10 @@ import lombok.Getter /** * This class encapsulates all externally relevant Table information. */ -@Getter -@Builder -class TableInfo { - private val nameSpace: String? = null - private val name: String? = null - private val fields: List? = null - private val primaryKeys: List? = null - private val cursorFields: List? = null -} +data class TableInfo( + val nameSpace: String, + val name: String, + val fields: List, + val primaryKeys: List = emptyList(), + val cursorFields: List +) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/AbstractStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/AbstractStateManager.kt index 627695d2d8f0..77b08db061d5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/AbstractStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/AbstractStateManager.kt @@ -18,22 +18,22 @@ import java.util.function.Supplier * @param The type associated with the state object managed by this manager. * @param The type associated with the state object stored in the state managed by this manager. */ -abstract class AbstractStateManager @JvmOverloads constructor(catalog: ConfiguredAirbyteCatalog?, - streamSupplier: Supplier>, - cursorFunction: Function?, - cursorFieldFunction: Function?>?, - cursorRecordCountFunction: Function?, - namespacePairFunction: Function?, +abstract class AbstractStateManager @JvmOverloads constructor(catalog: ConfiguredAirbyteCatalog?, + streamSupplier: Supplier>, + cursorFunction: Function?, + cursorFieldFunction: Function>?, + cursorRecordCountFunction: Function?, + namespacePairFunction: Function?, onlyIncludeIncrementalStreams: Boolean = false) : StateManager { /** * The [CursorManager] responsible for keeping track of the current cursor value for each * stream managed by this state manager. */ - private val cursorManager: CursorManager<*> = CursorManager(catalog, streamSupplier, cursorFunction, cursorFieldFunction, cursorRecordCountFunction, namespacePairFunction, + private val cursorManager: CursorManager<*> = CursorManager(catalog, streamSupplier, cursorFunction, cursorFieldFunction, cursorRecordCountFunction, namespacePairFunction, onlyIncludeIncrementalStreams) - override val pairToCursorInfoMap: Map? + override val pairToCursorInfoMap: Map get() = cursorManager.pairToCursorInfo - abstract override fun toState(pair: Optional): AirbyteStateMessage? + abstract override fun toState(pair: Optional): AirbyteStateMessage } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt index c17be70b9f4c..d889ca8a45dc 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManager.kt @@ -20,17 +20,18 @@ import java.util.stream.Collectors * @param The type that represents the stream object which holds the current cursor information * in the state. */ -class CursorManager(catalog: ConfiguredAirbyteCatalog?, +class CursorManager(catalog: ConfiguredAirbyteCatalog?, streamSupplier: Supplier>, - cursorFunction: Function?, - cursorFieldFunction: Function?>?, - cursorRecordCountFunction: Function?, + cursorFunction: Function?, + cursorFieldFunction: Function>?, + cursorRecordCountFunction: Function?, namespacePairFunction: Function?, onlyIncludeIncrementalStreams: Boolean) { /** * Map of streams (name/namespace tuple) to the current cursor information stored in the state. */ - private val pairToCursorInfo: Map + + val pairToCursorInfo: Map /** * Constructs a new [CursorManager] based on the configured connector and current state @@ -77,11 +78,11 @@ class CursorManager(catalog: ConfiguredAirbyteCatalog?, protected fun createCursorInfoMap( catalog: ConfiguredAirbyteCatalog?, streamSupplier: Supplier>, - cursorFunction: Function?, - cursorFieldFunction: Function?>?, - cursorRecordCountFunction: Function?, + cursorFunction: Function?, + cursorFieldFunction: Function>?, + cursorRecordCountFunction: Function?, namespacePairFunction: Function?, - onlyIncludeIncrementalStreams: Boolean): Map { + onlyIncludeIncrementalStreams: Boolean): Map { val allStreamNames = catalog!!.streams .stream() .filter { c: ConfiguredAirbyteStream -> @@ -95,7 +96,7 @@ class CursorManager(catalog: ConfiguredAirbyteCatalog?, .collect(Collectors.toSet()) allStreamNames.addAll(streamSupplier.get().stream().map(namespacePairFunction).filter { obj: AirbyteStreamNameNamespacePair? -> Objects.nonNull(obj) }.collect(Collectors.toSet())) - val localMap: MutableMap = ConcurrentHashMap() + val localMap: MutableMap = ConcurrentHashMap() val pairToState = streamSupplier.get() .stream() .collect(Collectors.toMap(namespacePairFunction, Function.identity())) @@ -108,7 +109,7 @@ class CursorManager(catalog: ConfiguredAirbyteCatalog?, localMap[pair] = createCursorInfoForStream(pair, stateOptional, streamOptional, cursorFunction, cursorFieldFunction, cursorRecordCountFunction) } - return localMap + return localMap.toMap() } /** @@ -129,12 +130,11 @@ class CursorManager(catalog: ConfiguredAirbyteCatalog?, * @return A [CursorInfo] object based on the data currently stored in the connector's state * for the given stream. */ - @VisibleForTesting - protected fun createCursorInfoForStream(pair: AirbyteStreamNameNamespacePair?, + internal fun createCursorInfoForStream(pair: AirbyteStreamNameNamespacePair?, stateOptional: Optional, streamOptional: Optional, - cursorFunction: Function?, - cursorFieldFunction: Function?>?, + cursorFunction: Function?, + cursorFieldFunction: Function>?, cursorRecordCountFunction: Function?): CursorInfo { val originalCursorField = stateOptional .map(cursorFieldFunction) @@ -193,14 +193,8 @@ class CursorManager(catalog: ConfiguredAirbyteCatalog?, return CursorInfo(originalCursorField, originalCursor, originalCursorRecordCount, cursorField, cursor, cursorRecordCount) } - /** - * Retrieves a copy of the stream name/namespace tuple to current cursor information map. - * - * @return A copy of the stream name/namespace tuple to current cursor information map. - */ - fun getPairToCursorInfo(): Map { - return java.util.Map.copyOf(pairToCursorInfo) - } + + /** * Retrieves an [Optional] possibly containing the current [CursorInfo] associated with diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt index 2955110bacf8..97da54ae580f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducer.kt @@ -15,8 +15,8 @@ import org.slf4j.LoggerFactory import java.util.* class CursorStateMessageProducer(private val stateManager: StateManager?, - private val initialCursor: Optional) : SourceStateMessageProducer { - private var currentMaxCursor: Optional + private val initialCursor: Optional) : SourceStateMessageProducer { + private var currentMaxCursor: Optional // We keep this field to mark `cursor_record_count` and also to control logging frequency. private var currentCursorRecordCount = 0 diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index aed51f4be4ff..3ba524c3551d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -19,7 +19,7 @@ import java.util.stream.Collectors * This implementation generates a single, global state object for the state tracked by this * manager. */ -class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, +class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, getStreamsSupplier(airbyteStateMessage), StateGeneratorUtils.CURSOR_FUNCTION, StateGeneratorUtils.CURSOR_FIELD_FUNCTION, @@ -49,7 +49,7 @@ class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: Con throw UnsupportedOperationException("Raw state retrieval not supported by global state manager.") } - override fun toState(pair: Optional): AirbyteStateMessage? { + override fun toState(pair: Optional): AirbyteStateMessage { // Populate global state val globalState = AirbyteGlobalState() globalState.sharedState = Jsons.jsonNode(cdcStateManager.cdcState) @@ -84,7 +84,7 @@ class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: Con } } - private fun extractStreams(airbyteStateMessage: AirbyteStateMessage?): Set { + private fun extractStreams(airbyteStateMessage: AirbyteStateMessage?): Set { if (airbyteStateMessage!!.type == AirbyteStateMessage.AirbyteStateType.GLOBAL) { return airbyteStateMessage.global.streamStates.stream() .map { streamState: AirbyteStreamState -> @@ -97,7 +97,7 @@ class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: Con } } - private fun extractNamespacePairsFromDbStreamState(streams: List): Set { + private fun extractNamespacePairsFromDbStreamState(streams: List): Set { return streams.stream().map { stream: DbStreamState -> val cloned = Jsons.clone(stream) AirbyteStreamNameNamespacePair(cloned.streamName, cloned.streamNamespace) @@ -113,7 +113,7 @@ class GlobalStateManager(airbyteStateMessage: AirbyteStateMessage?, catalog: Con * the initial state. * @return A [Supplier] that will be used to fetch the streams present in the initial state. */ - private fun getStreamsSupplier(airbyteStateMessage: AirbyteStateMessage?): Supplier> { + private fun getStreamsSupplier(airbyteStateMessage: AirbyteStateMessage?): Supplier> { /* * If the incoming message has the state type set to GLOBAL, it is using the new format. Therefore, * we can look for streams in the "global" field of the message. Otherwise, the message is still diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt index 4083cf9dd4d2..9e4ab07db58e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/LegacyStateManager.kt @@ -25,7 +25,7 @@ import java.util.function.Supplier */ @Deprecated("""This manager may be removed in the future if/once all connectors support per-stream state management.""") -class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, +class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, Supplier { dbState.streams }, CURSOR_FUNCTION, CURSOR_FIELD_FUNCTION, @@ -61,7 +61,7 @@ class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog?) : throw UnsupportedOperationException("Raw state retrieval not supported by global state manager.") } - override fun toState(pair: Optional): AirbyteStateMessage? { + override fun toState(pair: Optional): AirbyteStateMessage { val dbState = StateGeneratorUtils.generateDbState(pairToCursorInfoMap) .withCdc(isCdc) .withCdcState(cdcStateManager.cdcState) @@ -89,18 +89,18 @@ class LegacyStateManager(dbState: DbState, catalog: ConfiguredAirbyteCatalog?) : /** * [Function] that extracts the cursor from the stream state. */ - private val CURSOR_FUNCTION = Function { obj: DbStreamState? -> obj!!.cursor } + private val CURSOR_FUNCTION = DbStreamState::getCursor /** * [Function] that extracts the cursor field(s) from the stream state. */ - private val CURSOR_FIELD_FUNCTION = Function { obj: DbStreamState? -> obj!!.cursorField } + private val CURSOR_FIELD_FUNCTION = DbStreamState::getCursorField - private val CURSOR_RECORD_COUNT_FUNCTION = Function { stream: DbStreamState? -> Objects.requireNonNullElse(stream!!.cursorRecordCount, 0L) } + private val CURSOR_RECORD_COUNT_FUNCTION = Function { stream: DbStreamState -> Objects.requireNonNullElse(stream.cursorRecordCount, 0L) } /** * [Function] that creates an [AirbyteStreamNameNamespacePair] from the stream state. */ - private val NAME_NAMESPACE_PAIR_FUNCTION = Function { s: DbStreamState? -> AirbyteStreamNameNamespacePair(s!!.streamName, s.streamNamespace) } + private val NAME_NAMESPACE_PAIR_FUNCTION = Function { s: DbStreamState -> AirbyteStreamNameNamespacePair(s!!.streamName, s.streamNamespace) } } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt index f8240ca153ce..d51a3b3b5c4a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt @@ -13,10 +13,10 @@ import java.time.Duration import java.time.Instant import java.time.OffsetDateTime -class SourceStateIterator(private val messageIterator: Iterator, - private val stream: ConfiguredAirbyteStream, - private val sourceStateMessageProducer: SourceStateMessageProducer<*>, - private val stateEmitFrequency: StateEmitFrequency) : AbstractIterator(), MutableIterator { +open class SourceStateIterator(private val messageIterator: Iterator, + private val stream: ConfiguredAirbyteStream, + private val sourceStateMessageProducer: SourceStateMessageProducer, + private val stateEmitFrequency: StateEmitFrequency) : AbstractIterator(), MutableIterator { private var hasEmittedFinalState = false private var recordCount = 0L private var lastCheckpoint: Instant = Instant.now() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateEmitFrequency.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateEmitFrequency.kt index 242ea0e104f8..6c2d0120cc6f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateEmitFrequency.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateEmitFrequency.kt @@ -10,9 +10,6 @@ class StateEmitFrequency(syncCheckpointRecords: Long, syncCheckpointDuration: Du val syncCheckpointDuration: Duration init { - this.streamName = streamName - this.primaryKey = primaryKey - this.keySequence = keySequence this.syncCheckpointRecords = syncCheckpointRecords this.syncCheckpointDuration = syncCheckpointDuration } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt index 8528da0bdeeb..342cf3be2613 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt @@ -29,7 +29,7 @@ object StateGeneratorUtils { /** * [Function] that extracts the cursor from the stream state. */ - val CURSOR_FUNCTION: Function = Function { stream: AirbyteStreamState -> + val CURSOR_FUNCTION: Function = Function { stream: AirbyteStreamState -> val dbStreamState = extractState(stream) dbStreamState.map { obj: DbStreamState -> obj.cursor }.orElse(null) } @@ -68,7 +68,7 @@ object StateGeneratorUtils { * @return The [AirbyteStreamState] representing the current state of the stream. */ fun generateStreamState(airbyteStreamNameNamespacePair: AirbyteStreamNameNamespacePair?, - cursorInfo: CursorInfo?): AirbyteStreamState { + cursorInfo: CursorInfo): AirbyteStreamState { return AirbyteStreamState() .withStreamDescriptor( StreamDescriptor().withName(airbyteStreamNameNamespacePair!!.name).withNamespace(airbyteStreamNameNamespacePair.namespace)) @@ -85,10 +85,10 @@ object StateGeneratorUtils { * @return The list of stream states derived from the state information extracted from the provided * map. */ - fun generateStreamStateList(pairToCursorInfoMap: Map?): List { + fun generateStreamStateList(pairToCursorInfoMap: Map?): List { return pairToCursorInfoMap!!.entries.stream() .sorted(java.util.Map.Entry.comparingByKey()) - .map { e: Map.Entry -> generateStreamState(e.key, e.value) } + .map { e: Map.Entry -> generateStreamState(e.key, e.value) } .filter { s: AirbyteStreamState -> isValidStreamDescriptor(s.streamDescriptor) } .collect(Collectors.toList()) } @@ -100,12 +100,12 @@ object StateGeneratorUtils { * information for that stream * @return The legacy [DbState]. */ - fun generateDbState(pairToCursorInfoMap: Map?): DbState { + fun generateDbState(pairToCursorInfoMap: Map?): DbState { return DbState() .withCdc(false) .withStreams(pairToCursorInfoMap!!.entries.stream() .sorted(java.util.Map.Entry.comparingByKey()) // sort by stream name then namespace for sanity. - .map { e: Map.Entry -> generateDbStreamState(e.key, e.value) } + .map { e: Map.Entry -> generateDbStreamState(e.key, e.value) } .collect(Collectors.toList())) } @@ -117,14 +117,14 @@ object StateGeneratorUtils { * @return The [DbStreamState]. */ fun generateDbStreamState(airbyteStreamNameNamespacePair: AirbyteStreamNameNamespacePair?, - cursorInfo: CursorInfo?): DbStreamState { + cursorInfo: CursorInfo): DbStreamState { val state = DbStreamState() .withStreamName(airbyteStreamNameNamespacePair!!.name) .withStreamNamespace(airbyteStreamNameNamespacePair.namespace) - .withCursorField(if (cursorInfo.getCursorField() == null) emptyList() else Lists.newArrayList(cursorInfo.getCursorField())) - .withCursor(cursorInfo.getCursor()) - if (cursorInfo.getCursorRecordCount() > 0L) { - state.cursorRecordCount = cursorInfo.getCursorRecordCount() + .withCursorField(if (cursorInfo.cursorField == null) emptyList() else Lists.newArrayList(cursorInfo.cursorField)) + .withCursor(cursorInfo.cursor) + if (cursorInfo.cursorRecordCount > 0L) { + state.cursorRecordCount = cursorInfo.cursorRecordCount } return state } @@ -224,9 +224,7 @@ object StateGeneratorUtils { .map { state: StateWrapper -> when (state.stateType) { StateType.GLOBAL -> java.util.List.of(convertStateMessage(state.global)) - StateType.STREAM -> state.stateMessages - .stream() - .map { obj: io.airbyte.protocol.models.AirbyteStateMessage? -> convertStateMessage() }.toList() + StateType.STREAM -> state.stateMessages.map{convertStateMessage(it)} else -> java.util.List.of(AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.LEGACY) .withData(state.legacyState)) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt index 41d83e58370a..6bc8d8b12bac 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManager.kt @@ -45,7 +45,7 @@ interface StateManager { * @return The map of stream name/namespace tuple to the current cursor information for that stream * as maintained by this state manager. */ - val pairToCursorInfoMap: Map? + val pairToCursorInfoMap: Map /** * Generates an [AirbyteStateMessage] that represents the current state contained in the state @@ -56,7 +56,7 @@ interface StateManager { * @return The [AirbyteStateMessage] that represents the current state contained in the state * manager. */ - fun toState(pair: Optional): AirbyteStateMessage? + fun toState(pair: Optional): AirbyteStateMessage /** * Retrieves an [Optional] possibly containing the cursor value tracked in the state @@ -127,7 +127,7 @@ interface StateManager { * @return An [AirbyteStateMessage] that represents the current state maintained by the state * manager. */ - fun emit(pair: Optional): AirbyteStateMessage? { + fun emit(pair: Optional): AirbyteStateMessage? { return toState(pair) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index f599d6cec87b..3ecc65cff980 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -31,7 +31,7 @@ class StreamStateManager * [AirbyteStateMessage]s. * @param catalog The [ConfiguredAirbyteCatalog] for the connector associated with this state * manager. - */(private val rawAirbyteStateMessages: List, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, + */(private val rawAirbyteStateMessages: List, catalog: ConfiguredAirbyteCatalog?) : AbstractStateManager(catalog, Supplier { rawAirbyteStateMessages.stream().map { obj: AirbyteStateMessage? -> obj!!.stream }.collect(Collectors.toList()) }, StateGeneratorUtils.CURSOR_FUNCTION, StateGeneratorUtils.CURSOR_FIELD_FUNCTION, @@ -45,7 +45,7 @@ class StreamStateManager override val rawStateMessages: List? get() = rawAirbyteStateMessages - override fun toState(pair: Optional): AirbyteStateMessage? { + override fun toState(pair: Optional): AirbyteStateMessage { if (pair.isPresent) { val pairToCursorInfoMap = pairToCursorInfoMap val cursorInfo = Optional.ofNullable(pairToCursorInfoMap!![pair.get()]) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt index c946f582d13f..380932fbb87d 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt @@ -9,49 +9,49 @@ import org.apache.kafka.connect.source.SourceRecord import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.mockito.Mockito +import org.mockito.Mockito.mock import java.time.Duration import java.util.* import java.util.concurrent.LinkedBlockingQueue import java.util.function.Supplier class DebeziumRecordIteratorTest { - @get:Test - val heartbeatPositionTest: Unit - get() { - val debeziumRecordIterator = DebeziumRecordIterator(Mockito.mock(LinkedBlockingQueue::class.java), - object : CdcTargetPosition { - override fun reachedTargetPosition(changeEventWithMetadata: ChangeEventWithMetadata): Boolean { - return false - } - - override fun extractPositionFromHeartbeatOffset(sourceOffset: Map): Long { - return sourceOffset["lsn"] as Long - } - }, - Supplier { false }, - Mockito.mock(DebeziumShutdownProcedure::class.java), - Duration.ZERO, - Duration.ZERO) - val lsn = debeziumRecordIterator.getHeartbeatPosition(object : ChangeEvent { - private val sourceRecord = SourceRecord(null, Collections.singletonMap("lsn", 358824993496L), null, null, null) - - override fun key(): String? { - return null - } - - override fun value(): String { - return "{\"ts_ms\":1667616934701}" - } - - override fun destination(): String { - return null - } - - fun sourceRecord(): SourceRecord { - return sourceRecord - } - }) - - Assertions.assertEquals(lsn, 358824993496L) - } + @Test + fun getHeartbeatPositionTest() { + val debeziumRecordIterator = DebeziumRecordIterator(mock(), + object : CdcTargetPosition { + override fun reachedTargetPosition(changeEventWithMetadata: ChangeEventWithMetadata?): Boolean { + return false + } + + override fun extractPositionFromHeartbeatOffset(sourceOffset: Map?): Long { + return sourceOffset!!["lsn"] as Long + } + }, + { false }, + mock(), + Duration.ZERO, + Duration.ZERO) + val lsn = debeziumRecordIterator.getHeartbeatPosition(object : ChangeEvent { + private val sourceRecord = SourceRecord(null, Collections.singletonMap("lsn", 358824993496L), null, null, null) + + override fun key(): String? { + return null + } + + override fun value(): String { + return "{\"ts_ms\":1667616934701}" + } + + override fun destination(): String? { + return null + } + + fun sourceRecord(): SourceRecord { + return sourceRecord + } + }) + + Assertions.assertEquals(lsn, 358824993496L) + } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractDbSourceForTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractDbSourceForTest.kt new file mode 100644 index 000000000000..e569ad688655 --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractDbSourceForTest.kt @@ -0,0 +1,12 @@ +package io.airbyte.cdk.integrations.source.jdbc + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.AbstractDatabase +import io.airbyte.cdk.integrations.source.relationaldb.AbstractDbSource +import io.airbyte.protocol.models.v0.AirbyteStateMessage + +abstract class AbstractDbSourceForTest(driverClassName: String) : AbstractDbSource(driverClassName) { + public override fun getSupportedStateType(config: JsonNode?): AirbyteStateMessage.AirbyteStateType { + return super.getSupportedStateType(config) + } +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt index fb08fbe5caeb..224b2c945e60 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcSourceAcceptanceTest.kt @@ -40,7 +40,7 @@ import java.util.stream.Stream */ internal class DefaultJdbcSourceAcceptanceTest - : JdbcSourceAcceptanceTest() { + : JdbcSourceAcceptanceTest() { override fun config(): JsonNode { return testdb!!.testConfigBuilder()!!.build() } @@ -69,7 +69,7 @@ internal class DefaultJdbcSourceAcceptanceTest .build()) } - class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { + class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { override fun toDatabaseConfig(config: JsonNode): JsonNode { val configBuilder = ImmutableMap.builder() .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) @@ -85,11 +85,9 @@ internal class DefaultJdbcSourceAcceptanceTest return Jsons.jsonNode(configBuilder.build()) } - public override fun getExcludedInternalNameSpaces(): Set { - return setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") - } + override val excludedInternalNameSpaces = setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") - override fun getSupportedStateType(config: JsonNode): AirbyteStateMessage.AirbyteStateType { + override fun getSupportedStateType(config: JsonNode?): AirbyteStateMessage.AirbyteStateType { return AirbyteStateMessage.AirbyteStateType.STREAM } @@ -110,7 +108,7 @@ internal class DefaultJdbcSourceAcceptanceTest } class BareBonesTestDatabase - (container: PostgreSQLContainer<*>?) : TestDatabase?, BareBonesTestDatabase?, BareBonesConfigBuilder?>(container) { + (container: PostgreSQLContainer<*>) : TestDatabase, BareBonesTestDatabase, BareBonesConfigBuilder>(container) { override fun inContainerBootstrapCmd(): Stream?>? { val sql = Stream.of( String.format("CREATE DATABASE %s", databaseName), @@ -140,7 +138,7 @@ internal class DefaultJdbcSourceAcceptanceTest return BareBonesConfigBuilder(this) } - class BareBonesConfigBuilder(testDatabase: BareBonesTestDatabase) : ConfigBuilder(testDatabase) + class BareBonesConfigBuilder(testDatabase: BareBonesTestDatabase) : ConfigBuilder(testDatabase) } @Test @@ -157,18 +155,20 @@ internal class DefaultJdbcSourceAcceptanceTest } companion object { - private var PSQL_CONTAINER: PostgreSQLContainer<*>? = null + private lateinit var PSQL_CONTAINER: PostgreSQLContainer<*> + @JvmStatic @BeforeAll - fun init() { - PSQL_CONTAINER = PostgreSQLContainer("postgres:13-alpine") + fun init(): Unit { + PSQL_CONTAINER = PostgreSQLContainer("postgres:13-alpine") PSQL_CONTAINER!!.start() CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "CREATE TABLE %s (%s BIT(3) NOT NULL);" INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY = "INSERT INTO %s VALUES(B'101');" } + @JvmStatic @AfterAll - fun cleanUp() { + fun cleanUp(): Unit { PSQL_CONTAINER!!.close() } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcStressTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcStressTest.kt index bbedc8d651c4..8b9e33b60cbe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcStressTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/DefaultJdbcStressTest.kt @@ -62,9 +62,7 @@ internal class DefaultJdbcStressTest : JdbcStressTest() { super.setup() } - override fun getDefaultSchemaName(): Optional { - return Optional.of("public") - } + override val defaultSchemaName = Optional.of("public") override fun getSource(): AbstractJdbcSource { return PostgresTestSource() @@ -74,11 +72,9 @@ internal class DefaultJdbcStressTest : JdbcStressTest() { return config!! } - override fun getDriverClass(): String { - return PostgresTestSource.DRIVER_CLASS - } + override val driverClass = PostgresTestSource.DRIVER_CLASS - private class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { + private class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { override fun toDatabaseConfig(config: JsonNode): JsonNode { val configBuilder = ImmutableMap.builder() .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) @@ -94,9 +90,7 @@ internal class DefaultJdbcStressTest : JdbcStressTest() { return Jsons.jsonNode(configBuilder.build()) } - public override fun getExcludedInternalNameSpaces(): Set { - return setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") - } + public override val excludedInternalNameSpaces = setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") companion object { private val LOGGER: Logger = LoggerFactory.getLogger(PostgresTestSource::class.java) @@ -118,12 +112,14 @@ internal class DefaultJdbcStressTest : JdbcStressTest() { private var PSQL_DB: PostgreSQLContainer<*>? = null @BeforeAll + @JvmStatic fun init() { - PSQL_DB = PostgreSQLContainer("postgres:13-alpine") + PSQL_DB = PostgreSQLContainer("postgres:13-alpine") PSQL_DB!!.start() } @AfterAll + @JvmStatic fun cleanUp() { PSQL_DB!!.close() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSourceStressTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSourceStressTest.kt index 0ccf8ff0d84d..034824392aa6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSourceStressTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSourceStressTest.kt @@ -58,9 +58,7 @@ internal class JdbcSourceStressTest : JdbcStressTest() { super.setup() } - override fun getDefaultSchemaName(): Optional { - return Optional.of("public") - } + override val defaultSchemaName= Optional.of("public") override fun getSource(): AbstractJdbcSource { return PostgresTestSource() @@ -70,11 +68,9 @@ internal class JdbcSourceStressTest : JdbcStressTest() { return config!! } - override fun getDriverClass(): String { - return PostgresTestSource.DRIVER_CLASS - } + override val driverClass = PostgresTestSource.DRIVER_CLASS - private class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { + private class PostgresTestSource : AbstractJdbcSource(DRIVER_CLASS, Supplier { AdaptiveStreamingQueryConfig() }, JdbcUtils.defaultSourceOperations), Source { override fun toDatabaseConfig(config: JsonNode): JsonNode { val configBuilder = ImmutableMap.builder() .put(JdbcUtils.USERNAME_KEY, config[JdbcUtils.USERNAME_KEY].asText()) @@ -90,9 +86,7 @@ internal class JdbcSourceStressTest : JdbcStressTest() { return Jsons.jsonNode(configBuilder.build()) } - public override fun getExcludedInternalNameSpaces(): Set { - return setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") - } + override val excludedInternalNameSpaces = setOf("information_schema", "pg_catalog", "pg_internal", "catalog_history") companion object { private val LOGGER: Logger = LoggerFactory.getLogger(PostgresTestSource::class.java) @@ -114,12 +108,14 @@ internal class JdbcSourceStressTest : JdbcStressTest() { private var PSQL_DB: PostgreSQLContainer<*>? = null @BeforeAll + @JvmStatic fun init() { - PSQL_DB = PostgreSQLContainer("postgres:13-alpine") + PSQL_DB = PostgreSQLContainer("postgres:13-alpine") PSQL_DB!!.start() } @AfterAll + @JvmStatic fun cleanUp() { PSQL_DB!!.close() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSourceTest.kt index 4e66391a6b68..902ba5745eac 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSourceTest.kt @@ -4,6 +4,8 @@ package io.airbyte.cdk.integrations.source.relationaldb import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.db.AbstractDatabase +import io.airbyte.cdk.integrations.source.jdbc.AbstractDbSourceForTest import io.airbyte.cdk.integrations.source.relationaldb.state.* import io.airbyte.commons.json.Jsons import io.airbyte.commons.resources.MoreResources @@ -28,7 +30,7 @@ class AbstractDbSourceTest { @Test @Throws(IOException::class) fun testDeserializationOfLegacyState() { - val dbSource = Mockito.mock(AbstractDbSource::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) + val dbSource = Mockito.mock(AbstractDbSourceForTest::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) val config = Mockito.mock(JsonNode::class.java) val legacyStateJson = MoreResources.readResource("states/legacy.json") @@ -43,7 +45,7 @@ class AbstractDbSourceTest { @Test @Throws(IOException::class) fun testDeserializationOfGlobalState() { - val dbSource = Mockito.mock(AbstractDbSource::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) + val dbSource = Mockito.mock(AbstractDbSourceForTest::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) val config = Mockito.mock(JsonNode::class.java) val globalStateJson = MoreResources.readResource("states/global.json") @@ -58,7 +60,7 @@ class AbstractDbSourceTest { @Test @Throws(IOException::class) fun testDeserializationOfStreamState() { - val dbSource = Mockito.mock(AbstractDbSource::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) + val dbSource = Mockito.mock(AbstractDbSourceForTest::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) val config = Mockito.mock(JsonNode::class.java) val streamStateJson = MoreResources.readResource("states/per_stream.json") @@ -73,7 +75,7 @@ class AbstractDbSourceTest { @Test @Throws(IOException::class) fun testDeserializationOfNullState() { - val dbSource = Mockito.mock(AbstractDbSource::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) + val dbSource = Mockito.mock(AbstractDbSourceForTest::class.java, Mockito.withSettings().useConstructor("").defaultAnswer(Mockito.CALLS_REAL_METHODS)) val config = Mockito.mock(JsonNode::class.java) val result = StateGeneratorUtils.deserializeInitialState(null, dbSource.getSupportedStateType(config)) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManagerTest.kt index f1a8e293eff3..617b0cf3be06 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorManagerTest.kt @@ -123,7 +123,7 @@ class CursorManagerTest { private fun createCursorManager(cursorField: String?, cursor: String?, - nameNamespacePair: AirbyteStreamNameNamespacePair?): CursorManager { + nameNamespacePair: AirbyteStreamNameNamespacePair?): CursorManager { val dbStreamState = StateTestConstants.getState(cursorField, cursor).get() return CursorManager( StateTestConstants.getCatalog(cursorField).orElse(null), @@ -136,7 +136,7 @@ class CursorManagerTest { } companion object { - private val CURSOR_RECORD_COUNT_FUNCTION = Function { stream: DbStreamState? -> + private val CURSOR_RECORD_COUNT_FUNCTION = Function { stream: DbStreamState -> if (stream!!.cursorRecordCount != null) { return@Function stream.cursorRecordCount } else { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducerTest.kt index 84fb1130ee99..132689d73a7b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/CursorStateMessageProducerTest.kt @@ -22,7 +22,7 @@ import java.util.List internal class CursorStateMessageProducerTest { private fun createExceptionIterator(): Iterator { - return object : MutableIterator { + return object : Iterator { val internalMessageIterator: Iterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2, RECORD_MESSAGE_2, RECORD_MESSAGE_3) @@ -64,7 +64,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.empty()) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -83,7 +83,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_5)) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -101,7 +101,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.empty()) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageStream, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageStream, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) Assertions.assertEquals(recordMessage, iterator.next()) // null because no records with a cursor field were replicated for the stream. @@ -117,7 +117,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_1)) - val iterator: SourceStateIterator<*> = SourceStateIterator(exceptionIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(exceptionIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -140,7 +140,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_1)) - val iterator: SourceStateIterator<*> = SourceStateIterator(exceptionIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(exceptionIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -157,7 +157,7 @@ internal class CursorStateMessageProducerTest { Optional.empty()) val iterator: SourceStateIterator<*> = - SourceStateIterator(Collections.emptyIterator(), STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) + SourceStateIterator(Collections.emptyIterator(), STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) Assertions.assertEquals(EMPTY_STATE_MESSAGE, iterator.next()) Assertions.assertFalse(iterator.hasNext()) @@ -175,7 +175,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.empty()) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(0, Duration.ZERO)) Assertions.assertEquals(recordMessageWithNull, iterator.next()) Assertions.assertEquals(createStateMessage(RECORD_VALUE_1, 1, 1.0), iterator.next()) @@ -190,7 +190,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.empty()) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) // should emit state 1, but it is unclear whether there will be more @@ -218,7 +218,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.empty()) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(2, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(2, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_1, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -241,7 +241,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_1)) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_3, iterator.next()) @@ -294,7 +294,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_1)) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(1, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -326,7 +326,7 @@ internal class CursorStateMessageProducerTest { stateManager, Optional.of(RECORD_VALUE_1)) - val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(10, Duration.ZERO)) + val iterator: SourceStateIterator<*> = SourceStateIterator(messageIterator, STREAM, producer, StateEmitFrequency(10, Duration.ZERO)) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) Assertions.assertEquals(RECORD_MESSAGE_2, iterator.next()) @@ -420,6 +420,6 @@ internal class CursorStateMessageProducerTest { .withSourceStats(AirbyteStateStats().withRecordCount(statsRecordCount))) } - private var messageIterator: Iterator? = null + private lateinit var messageIterator: Iterator } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt index 374f11da880d..673945594ba5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManagerTest.kt @@ -32,8 +32,8 @@ class GlobalStateManagerTest { GlobalStateManager(AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.GLOBAL).withGlobal(globalState), catalog) Assertions.assertNotNull(stateManager.cdcStateManager) Assertions.assertEquals(cdcState, stateManager.cdcStateManager.cdcState) - Assertions.assertEquals(1, stateManager.cdcStateManager.initialStreamsSynced.size) - Assertions.assertTrue(stateManager.cdcStateManager.initialStreamsSynced.contains(AirbyteStreamNameNamespacePair("name", "namespace"))) + Assertions.assertEquals(1, stateManager.cdcStateManager.initialStreamsSynced!!.size) + Assertions.assertTrue(stateManager.cdcStateManager.initialStreamsSynced!!.contains(AirbyteStreamNameNamespacePair("name", "namespace"))) } @Test @@ -203,7 +203,7 @@ class GlobalStateManagerTest { val airbyteStateMessage = stateManager.toState(Optional.empty()) Assertions.assertNotNull(airbyteStateMessage) - Assertions.assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, airbyteStateMessage.type) + Assertions.assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, airbyteStateMessage!!.type) Assertions.assertEquals(0, airbyteStateMessage.global.streamStates.size) } @@ -218,7 +218,7 @@ class GlobalStateManagerTest { val stateManager: StateManager = GlobalStateManager(AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.LEGACY).withData(Jsons.jsonNode(dbState)), catalog) Assertions.assertNotNull(stateManager.cdcStateManager) - Assertions.assertEquals(1, stateManager.cdcStateManager.initialStreamsSynced.size) - Assertions.assertTrue(stateManager.cdcStateManager.initialStreamsSynced.contains(AirbyteStreamNameNamespacePair("name", "namespace"))) + Assertions.assertEquals(1, stateManager.cdcStateManager.initialStreamsSynced!!.size) + Assertions.assertTrue(stateManager.cdcStateManager.initialStreamsSynced!!.contains(AirbyteStreamNameNamespacePair("name", "namespace"))) } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorForTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorForTest.kt new file mode 100644 index 000000000000..e418cbc9990b --- /dev/null +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorForTest.kt @@ -0,0 +1,11 @@ +package io.airbyte.cdk.integrations.source.relationaldb.state + +import io.airbyte.protocol.models.v0.AirbyteMessage +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream + +class SourceStateIteratorForTest(messageIterator: Iterator, + stream: ConfiguredAirbyteStream, + sourceStateMessageProducer: SourceStateMessageProducer, + stateEmitFrequency: StateEmitFrequency): SourceStateIterator(messageIterator, stream, sourceStateMessageProducer, stateEmitFrequency) { + public override fun computeNext(): AirbyteMessage? = super.computeNext() +} diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.kt index 8607359ec4c8..51977aea30ec 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIteratorTest.kt @@ -9,22 +9,23 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.ArgumentMatchers import org.mockito.Mockito +import org.mockito.Mockito.mock import java.time.Duration class SourceStateIteratorTest { - var mockProducer: SourceStateMessageProducer<*>? = null - var messageIterator: Iterator? = null - var stream: ConfiguredAirbyteStream? = null + lateinit var mockProducer: SourceStateMessageProducer + lateinit var messageIterator: Iterator + lateinit var stream: ConfiguredAirbyteStream - var sourceStateIterator: SourceStateIterator<*>? = null + var sourceStateIterator: SourceStateIteratorForTest<*>? = null @BeforeEach fun setup() { - mockProducer = Mockito.mock(SourceStateMessageProducer::class.java) - stream = Mockito.mock(ConfiguredAirbyteStream::class.java) - messageIterator = Mockito.mock>(MutableIterator::class.java) + mockProducer = mock() + stream = mock() + messageIterator = mock() val stateEmitFrequency = StateEmitFrequency(1L, Duration.ofSeconds(100L)) - sourceStateIterator = SourceStateIterator(messageIterator, stream, mockProducer, stateEmitFrequency) + sourceStateIterator = SourceStateIteratorForTest(messageIterator, stream, mockProducer, stateEmitFrequency) } // Provides a way to generate a record message and will verify corresponding spied functions have @@ -33,7 +34,7 @@ class SourceStateIteratorTest { Mockito.doReturn(true).`when`(messageIterator).hasNext() Mockito.doReturn(false).`when`(mockProducer).shouldEmitStateMessage(ArgumentMatchers.eq(stream)) val message = AirbyteMessage().withType(AirbyteMessage.Type.RECORD).withRecord(AirbyteRecordMessage()) - Mockito.doReturn(message).`when`(mockProducer).processRecordMessage(ArgumentMatchers.eq(stream), ArgumentMatchers.any()) + Mockito.doReturn(message).`when`(mockProducer).processRecordMessage(ArgumentMatchers.eq(stream), ArgumentMatchers.any()) Mockito.doReturn(message).`when`(messageIterator).next() Assert.assertEquals(message, sourceStateIterator!!.computeNext()) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateTestConstants.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateTestConstants.kt index 90eb8d487540..38f16534a4dd 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateTestConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateTestConstants.kt @@ -27,7 +27,7 @@ object StateTestConstants { const val CURSOR: String = "2000" const val CURSOR_RECORD_COUNT: Long = 19L - fun getState(cursorField: String?, cursor: String?): Optional { + fun getState(cursorField: String?, cursor: String?): Optional { return Optional.of(DbStreamState() .withStreamName(STREAM_NAME1) .withCursorField(Lists.newArrayList(cursorField)) @@ -42,12 +42,12 @@ object StateTestConstants { .withCursorRecordCount(cursorRecordCount)) } - fun getCatalog(cursorField: String?): Optional { + fun getCatalog(cursorField: String?): Optional { return Optional.of(ConfiguredAirbyteCatalog() .withStreams(List.of(getStream(cursorField).orElse(null)))) } - fun getStream(cursorField: String?): Optional { + fun getStream(cursorField: String?): Optional { return Optional.of(ConfiguredAirbyteStream() .withStream(AirbyteStream().withName(STREAM_NAME1)) .withCursorField(if (cursorField == null) emptyList() else Lists.newArrayList(cursorField))) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/test/utils/DatabaseConnectionHelperTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/test/utils/DatabaseConnectionHelperTest.kt index f9d93767af0d..61eaa613ed2a 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/test/utils/DatabaseConnectionHelperTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/test/utils/DatabaseConnectionHelperTest.kt @@ -36,8 +36,9 @@ internal class DatabaseConnectionHelperTest { protected var container: PostgreSQLContainer<*>? = null @BeforeAll + @JvmStatic fun dbSetup() { - container = PostgreSQLContainer("postgres:13-alpine") + container = PostgreSQLContainer("postgres:13-alpine") .withDatabaseName(DATABASE_NAME) .withUsername("docker") .withPassword("docker") @@ -45,6 +46,7 @@ internal class DatabaseConnectionHelperTest { } @AfterAll + @JvmStatic fun dbDown() { container!!.close() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 539539453cec..a664070cba50 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -24,8 +24,8 @@ import java.util.function.Consumer import java.util.stream.Collectors import java.util.stream.Stream -abstract class CdcSourceTest?> { - protected var testdb: T? = null +abstract class CdcSourceTest> { + protected lateinit var testdb: T protected fun createTableSqlFmt(): String { return "CREATE TABLE %s.%s(%s);" diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index cbaf441c3b9a..e56d7164f32f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -36,8 +36,8 @@ import java.util.stream.Collectors * Tests that should be run on all Sources that extend the AbstractJdbcSource. */ @SuppressFBWarnings(value = ["MS_SHOULD_BE_FINAL"], justification = "The static variables are updated in subclasses for convenience, and cannot be final.") -abstract class JdbcSourceAcceptanceTest?> { - protected var testdb: T? = null +abstract class JdbcSourceAcceptanceTest> { + protected lateinit var testdb: T protected fun streamName(): String { return TABLE_NAME @@ -227,6 +227,7 @@ abstract class JdbcSourceAcceptanceTest?> // clickhouse and mysql do not have a concept of schemas, so this test does not make sense for them. when (testdb!!.databaseDriver) { DatabaseDriver.MYSQL, DatabaseDriver.CLICKHOUSE, DatabaseDriver.TERADATA -> return + else ->{} } // add table and data to a separate schema. testdb!!.with("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)", @@ -252,8 +253,8 @@ abstract class JdbcSourceAcceptanceTest?> expected.streams = catalogStreams // sort streams by name so that we are comparing lists with the same order. val schemaTableCompare = Comparator.comparing { stream: AirbyteStream -> stream.namespace + "." + stream.name } - expected.streams.sort(schemaTableCompare) - actual!!.streams.sort(schemaTableCompare) + expected.streams.sortWith(schemaTableCompare) + actual!!.streams.sortWith(schemaTableCompare) Assertions.assertEquals(expected, filterOutOtherSchemas(actual)) } @@ -289,7 +290,7 @@ abstract class JdbcSourceAcceptanceTest?> protected val airbyteMessagesReadOneColumn: List get() { val expectedMessages = testMessages.stream() - .map { `object`: AirbyteMessage? -> Jsons.clone(`object`) } + .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> (m.record.data as ObjectNode).remove(COL_NAME) (m.record.data as ObjectNode).remove(COL_UPDATED_AT) @@ -336,7 +337,7 @@ abstract class JdbcSourceAcceptanceTest?> protected fun getAirbyteMessagesSecondSync(streamName: String?): List { return testMessages .stream() - .map { `object`: AirbyteMessage? -> Jsons.clone(`object`) } + .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> m.record.stream = streamName m.record.namespace = defaultNamespace @@ -372,7 +373,7 @@ abstract class JdbcSourceAcceptanceTest?> protected fun getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces: ConfiguredAirbyteStream): List { return testMessages .stream() - .map { `object`: AirbyteMessage? -> Jsons.clone(`object`) } + .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> m.record.stream = streamForTableWithSpaces.stream.name (m.record.data as ObjectNode).set(COL_LAST_NAME_WITH_SPACE, @@ -630,7 +631,7 @@ abstract class JdbcSourceAcceptanceTest?> protected fun getAirbyteMessagesSecondStreamWithNamespace(streamName2: String?): List { return testMessages .stream() - .map { `object`: AirbyteMessage? -> Jsons.clone(`object`) } + .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } .peek { m: AirbyteMessage -> m.record.stream = streamName2 (m.record.data as ObjectNode).remove(COL_UPDATED_AT) @@ -944,7 +945,7 @@ abstract class JdbcSourceAcceptanceTest?> Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaType.STRING)) } - fun getFullyQualifiedTableName(tableName: String?): String { + fun getFullyQualifiedTableName(tableName: String): String { return RelationalDbQueryUtils.getFullyQualifiedTableName(defaultSchemaName, tableName) } @@ -1041,7 +1042,7 @@ abstract class JdbcSourceAcceptanceTest?> } companion object { - protected var SCHEMA_NAME: String = "jdbc_integration_test1" + @JvmStatic protected var SCHEMA_NAME: String = "jdbc_integration_test1" protected var SCHEMA_NAME2: String = "jdbc_integration_test2" protected var TEST_SCHEMAS: Set = java.util.Set.of(SCHEMA_NAME, SCHEMA_NAME2) @@ -1075,8 +1076,8 @@ abstract class JdbcSourceAcceptanceTest?> protected var COLUMN_CLAUSE_WITHOUT_PK: String = "id INTEGER, name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL" protected var COLUMN_CLAUSE_WITH_COMPOSITE_PK: String = "first_name VARCHAR(200) NOT NULL, last_name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL" - protected var CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "CREATE TABLE %s (%s bit NOT NULL);" - protected var INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "INSERT INTO %s VALUES(0);" + @JvmField var CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "CREATE TABLE %s (%s bit NOT NULL);" + @JvmField var INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "INSERT INTO %s VALUES(0);" protected var CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = "CREATE TABLE %s (%s VARCHAR(20));" protected var INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = "INSERT INTO %s VALUES('Hello world :)');" protected var INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY: String = "INSERT INTO %s (name, timestamp) VALUES ('%s', '%s')" diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt index 5395e454a762..8149e0992ee7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcStressTest.kt @@ -37,7 +37,7 @@ import java.util.* @SuppressFBWarnings(value = ["MS_SHOULD_BE_FINAL"], justification = "The static variables are updated in sub classes for convenience, and cannot be final.") abstract class JdbcStressTest { private var bitSet: BitSet? = null - private var config: JsonNode? = null + private lateinit var config: JsonNode private var source: AbstractJdbcSource<*>? = null /** @@ -56,7 +56,7 @@ abstract class JdbcStressTest { * * @return config */ - abstract fun getConfig(): JsonNode? + abstract fun getConfig(): JsonNode /** * Full qualified class name of the JDBC driver for the database. @@ -101,7 +101,7 @@ abstract class JdbcStressTest { val batchCount = TOTAL_RECORDS / BATCH_SIZE LOGGER.info("writing {} batches of {}", batchCount, BATCH_SIZE) for (i in 0 until batchCount) { - if (i % 1000 == 0) LOGGER.info("writing batch: $i") + if (i % 1000 == 0L) LOGGER.info("writing batch: $i") val insert: MutableList = ArrayList() for (j in 0 until BATCH_SIZE) { val recordNumber = (i * BATCH_SIZE + j).toInt() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 28f1fc794ae2..2079df7ea02e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -50,13 +50,10 @@ abstract class AbstractSourceConnectorTest { protected var localRoot: Path? = null private var processFactory: ProcessFactory? = null - protected abstract val imageName: String? /** * Name of the docker image that the tests will run against. - * - * @return docker image name */ - get + protected abstract val imageName: String @get:Throws(Exception::class) protected abstract val config: JsonNode? @@ -89,9 +86,9 @@ abstract class AbstractSourceConnectorTest { @Throws(Exception::class) protected abstract fun tearDown(testEnv: TestDestinationEnv?) - private var mAirbyteApiClient: AirbyteApiClient? = null + private lateinit var mAirbyteApiClient: AirbyteApiClient - private var mSourceApi: SourceApi? = null + private lateinit var mSourceApi: SourceApi private var mConnectorConfigUpdater: ConnectorConfigUpdater? = null diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index c00a38187714..4a17f5c91e4c 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -103,7 +103,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { // testDataHolders should be initialized using the `addDataTypeTestData` function testDataHolders.forEach(Consumer { testDataHolder: TestDataHolder -> val airbyteStream = streams[testDataHolder.nameWithTestPrefix] - val jsonSchemaTypeMap = Jsons.deserialize>( + val jsonSchemaTypeMap = Jsons.deserialize( airbyteStream!!.jsonSchema["properties"][testColumnName].toString(), MutableMap::class.java) as Map Assertions.assertEquals(testDataHolder.airbyteType.jsonSchemaTypeMap, jsonSchemaTypeMap, "Expected column type for " + testDataHolder.nameWithTestPrefix) @@ -171,22 +171,22 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { val errorsByStream: MutableMap> = HashMap() for (streamName in unexpectedValuesByStream.keys) { errorsByStream.putIfAbsent(streamName, ArrayList()) - val test = testByName[streamName] + val test = testByName.getValue(streamName) val unexpectedValues: List = unexpectedValuesByStream[streamName]!! for (unexpectedValue in unexpectedValues) { errorsByStream[streamName]!!.add( - "The stream '%s' checking type '%s' initialized at %s got unexpected values: %s".formatted(streamName, test.getSourceType(), + "The stream '%s' checking type '%s' initialized at %s got unexpected values: %s".formatted(streamName, test.sourceType, test!!.declarationLocation, unexpectedValue)) } } for (streamName in missedValuesByStream.keys) { errorsByStream.putIfAbsent(streamName, ArrayList()) - val test = testByName[streamName] + val test = testByName.getValue(streamName) val missedValues: List = missedValuesByStream[streamName]!! for (missedValue in missedValues) { errorsByStream[streamName]!!.add( - "The stream '%s' checking type '%s' initialized at %s is missing values: %s".formatted(streamName, test.getSourceType(), + "The stream '%s' checking type '%s' initialized at %s is missing values: %s".formatted(streamName, test.sourceType, test!!.declarationLocation, missedValue)) } } @@ -223,7 +223,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { protected fun createTables() { for (test in testDataHolders) { database!!.query { ctx: DSLContext? -> - ctx.fetch(test.createSqlQuery) + ctx!!.fetch(test.createSqlQuery) LOGGER.info("Table {} is created.", test.nameWithTestPrefix) null } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt index c0d7764c6334..aa2a7e33a9de 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt @@ -58,6 +58,9 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() { }) } + override val imageName: String + get() = IMAGE_NAME + @Throws(Exception::class) override fun setupEnvironment(environment: TestDestinationEnv?) { testRoot = Files.createTempDirectory(Files.createDirectories(Path.of("/tmp/standard_test")), "pytest") @@ -132,8 +135,7 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() { private val LOGGER: Logger = LoggerFactory.getLogger(PythonSourceAcceptanceTest::class.java) private const val OUTPUT_FILENAME = "output.json" - protected var imageName: String? = null - get() = Companion.field + lateinit var IMAGE_NAME: String var PYTHON_CONTAINER_NAME: String? = null } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt index 57d6be275819..1e047df3ca37 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt @@ -7,7 +7,7 @@ import io.airbyte.protocol.models.JsonSchemaType import java.util.* class TestDataHolder internal constructor(val sourceType: String?, - val airbyteType: JsonSchemaType?, + val airbyteType: JsonSchemaType, val values: List, val expectedValues: MutableList, private val createTablePatternSql: String, @@ -18,11 +18,12 @@ class TestDataHolder internal constructor(val sourceType: String?, private var idColumnName: String? = null private var testColumnName: String? = null - private var declarationLocation: Array + var declarationLocation: String = "" + private set class TestDataHolderBuilder internal constructor() { private var sourceType: String? = null - private var airbyteType: JsonSchemaType? = null + private lateinit var airbyteType: JsonSchemaType private val values: MutableList = ArrayList() private val expectedValues: MutableList = ArrayList() private var createTablePatternSql: String @@ -56,7 +57,7 @@ class TestDataHolder internal constructor(val sourceType: String?, * @param airbyteType Airbyte data type * @return builder */ - fun airbyteType(airbyteType: JsonSchemaType?): TestDataHolderBuilder { + fun airbyteType(airbyteType: JsonSchemaType): TestDataHolderBuilder { this.airbyteType = airbyteType return this } @@ -108,7 +109,7 @@ class TestDataHolder internal constructor(val sourceType: String?, * @param insertValue test value * @return builder */ - fun addInsertValues(vararg insertValue: String?): TestDataHolderBuilder { + fun addInsertValues(vararg insertValue: String): TestDataHolderBuilder { values.addAll(Arrays.asList(*insertValue)) return this } @@ -162,11 +163,7 @@ class TestDataHolder internal constructor(val sourceType: String?, fullSourceDataType) fun setDeclarationLocation(declarationLocation: Array) { - this.declarationLocation = declarationLocation - } - - fun getDeclarationLocation(): String { - return Arrays.asList(*declarationLocation).subList(2, 3).toString() + this.declarationLocation = Arrays.asList(*declarationLocation).subList(2, 3).toString() } val insertSqlQueries: List diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/fs/ExecutableTestSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/fs/ExecutableTestSource.kt index 1d1b538e7646..7477b44ae916 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/fs/ExecutableTestSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/fs/ExecutableTestSource.kt @@ -22,7 +22,7 @@ class ExecutableTestSource : SourceAcceptanceTest() { override val spec: ConnectorSpecification get() = Jsons.deserialize(IOs.readFile(TEST_CONFIG!!.specPath), ConnectorSpecification::class.java) - override val imageName: String? + override val imageName: String get() = TEST_CONFIG!!.imageName override val config: JsonNode? diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceBasePerformanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceBasePerformanceTest.kt index a89538094b00..440fb8613324 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceBasePerformanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceBasePerformanceTest.kt @@ -11,27 +11,25 @@ import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv * tests. */ abstract class AbstractSourceBasePerformanceTest : AbstractSourceConnectorTest() { + /** + * The column name will be used for a test column in the test tables. Override it if default name is + * not valid for your source. + */ + protected val testColumnName + get() = TEST_COLUMN_NAME + /** + * The stream name template will be used for a test tables. Override it if default name is not valid + * for your source. + */ + protected val testStreamNameTemplate + get() = TEST_STREAM_NAME_TEMPLATE @Throws(Exception::class) override fun setupEnvironment(environment: TestDestinationEnv?) { // DO NOTHING. Mandatory to override. DB will be setup as part of each test } companion object { - protected val testColumnName: String = "test_column" - /** - * The column name will be used for a test column in the test tables. Override it if default name is - * not valid for your source. - * - * @return Test column name - */ - get() = Companion.field - protected val testStreamNameTemplate: String = "test_%S" - /** - * The stream name template will be used for a test tables. Override it if default name is not valid - * for your source. - * - * @return Test steam name template - */ - get() = Companion.field + protected const val TEST_COLUMN_NAME: String = "test_column" + protected const val TEST_STREAM_NAME_TEMPLATE: String = "test_%S" } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt index 5fdf8418dd76..485de256c571 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourceFillDbWithTestData.kt @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.standardtest.source.performancetest import io.airbyte.cdk.db.Database import org.jooq.DSLContext import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.TestInstance import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource @@ -17,6 +18,7 @@ import java.util.stream.Stream /** * This abstract class contains common methods for Fill Db scripts. */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) abstract class AbstractSourceFillDbWithTestData : AbstractSourceBasePerformanceTest() { /** * Setup the test database. All tables and data described in the registered tests will be put there. diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt index cde09b731521..e861725eebb3 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/performancetest/AbstractSourcePerformanceTest.kt @@ -10,6 +10,7 @@ import io.airbyte.protocol.models.Field import io.airbyte.protocol.models.JsonSchemaType import io.airbyte.protocol.models.v0.* import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.TestInstance import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments import org.junit.jupiter.params.provider.MethodSource @@ -22,8 +23,15 @@ import java.util.stream.Stream /** * This abstract class contains common methods for Performance tests. */ +@TestInstance(TestInstance.Lifecycle.PER_CLASS) abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest() { override var config: JsonNode? = null + /** + * The column name will be used for a PK column in the test tables. Override it if default name is + * not valid for your source. + */ + protected val idColumnName: String = "id" + /** * Setup the test database. All tables and data described in the registered tests will be put there. @@ -83,8 +91,8 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest } protected fun prepareMapWithExpectedRecords(streamNumber: Int, - expectedRecordsNumberInEachStream: Int): Map { - val resultMap: MutableMap = HashMap() // streamName&expected records in stream + expectedRecordsNumberInEachStream: Int): MutableMap { + val resultMap: MutableMap = HashMap() // streamName&expected records in stream for (currentStream in 0 until streamNumber) { val streamName = String.format(testStreamNameTemplate, currentStream) @@ -135,13 +143,5 @@ abstract class AbstractSourcePerformanceTest : AbstractSourceBasePerformanceTest companion object { protected val c: Logger = LoggerFactory.getLogger(AbstractSourcePerformanceTest::class.java) - protected val idColumnName: String = "id" - /** - * The column name will be used for a PK column in the test tables. Override it if default name is - * not valid for your source. - * - * @return Id column name - */ - get() = Companion.field } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt index 9206b899d43d..0f1efe16423f 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt @@ -130,7 +130,7 @@ class RecordDiffer @SafeVarargs constructor(private val rawRecordColumnNames: Ma // Start with a noop comparator for convenience var comp = Comparator.comparing { record: JsonNode -> 0 } for ((key, value) in identifyingColumns) { - comp = comp.thenComparing(Comparator { record: JsonNode -> extract(record, key, value) }) + //comp = comp.thenComparing(Comparator { record: JsonNode -> extract(record, key, value) }) } comp = comp.thenComparing { record: JsonNode -> asTimestampWithTimezone(record[getMetadataColumnName(columnNames, "_airbyte_extracted_at")]) } return comp