Skip to content

Commit

Permalink
fix compiler warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 25, 2024
1 parent abe39af commit f6db47e
Show file tree
Hide file tree
Showing 58 changed files with 367 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object DSLContextFactory {
driverClassName: String,
jdbcConnectionString: String?,
dialect: SQLDialect?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DSLContext {
return DSL.using(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DataSourceFactory {
password: String?,
driverClassName: String,
jdbcConnectionString: String?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
Expand Down Expand Up @@ -100,7 +100,7 @@ object DataSourceFactory {
port: Int,
database: String?,
driverClassName: String,
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, host, port, database)
.withConnectionProperties(connectionProperties)
Expand Down Expand Up @@ -152,7 +152,7 @@ object DataSourceFactory {
private var password: String?,
private var driverClassName: String
) {
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
private var connectionProperties: Map<String, String> = java.util.Map.of()
private var database: String? = null
private var host: String? = null
private var jdbcUrl: String? = null
Expand Down Expand Up @@ -185,7 +185,7 @@ object DataSourceFactory {
}

fun withConnectionProperties(
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSourceBuilder {
if (connectionProperties != null) {
this.connectionProperties = connectionProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory

/** Implementation of source operations with standard JDBC types. */
class JdbcSourceOperations :
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
return try {
JDBCType.valueOf(columnTypeInt)
Expand Down Expand Up @@ -147,7 +147,7 @@ class JdbcSourceOperations :
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
}

override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
return when (jdbcType) {
JDBCType.BIT,
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*

abstract class JdbcConnector
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
return getConnectionTimeout(connectionProperties, driverClassName)
}

Expand All @@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
fun getConnectionTimeout(
connectionProperties: Map<String?, String?>,
connectionProperties: Map<String, String>,
driverClassName: String?
): Duration {
val parsedConnectionTimeout =
Expand Down
5 changes: 5 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ java {
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false


// Convert yaml to java: relationaldb.models
jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
reportQueueUtilization()
return super.poll()
}

companion object {
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)
}
}

fun getIncrementalIterators(debeziumPropertiesManager: DebeziumPropertiesManager,
Expand All @@ -70,12 +66,12 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,
val offsetManager: AirbyteFileOffsetBackingStore = AirbyteFileOffsetBackingStore.Companion.initializeState(
cdcSavedInfoFetcher.savedOffset,
if (addDbNameToOffsetState) Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText()) else Optional.empty<String>())
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?> = if (trackSchemaHistory
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> = if (trackSchemaHistory
) Optional.of<AirbyteSchemaHistoryStorage?>(AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
cdcSavedInfoFetcher.savedSchemaHistory, cdcStateHandler.compressSchemaHistoryForState()))
else Optional.empty<AirbyteSchemaHistoryStorage>()
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize)
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> = CapacityReportingBlockingQueue(queueSize)
publisher.start(queue, offsetManager, schemaHistoryManager)
// handle state machine around pub/sub logic.
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> = DebeziumRecordIterator(
Expand Down Expand Up @@ -106,6 +102,7 @@ class AirbyteDebeziumHandler<T>(private val config: JsonNode,

companion object {
private val LOGGER: Logger = LoggerFactory.getLogger(AirbyteDebeziumHandler::class.java)
private val REPORT_DURATION: Duration = Duration.of(10, ChronoUnit.SECONDS)

/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ import java.util.*
interface CdcSavedInfoFetcher {
val savedOffset: JsonNode?

val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode?>?>?
val savedSchemaHistory: AirbyteSchemaHistoryStorage.SchemaHistory<Optional<JsonNode>>?
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.airbyte.protocol.models.v0.AirbyteMessage
* which suits them. Also, it adds some utils to verify CDC event status.
*/
interface CdcStateHandler {
fun saveState(offset: Map<String?, String?>?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String?>?): AirbyteMessage?
fun saveState(offset: Map<String?, String?>?, dbHistory: AirbyteSchemaHistoryStorage.SchemaHistory<String>?): AirbyteMessage?

fun saveStateAfterCompletionOfSnapshotOfNewStreams(): AirbyteMessage?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class AirbyteFileOffsetBackingStore(private val offsetFilePath: Path, private va
}

fun persist(cdcState: JsonNode?) {
val mapAsString: Map<String, String?> =
if (cdcState != null) Jsons.`object`<Map<*, *>>(cdcState, MutableMap::class.java) else emptyMap<String, String>()
val mapAsString: Map<String, String> =
if (cdcState != null) Jsons.`object`(cdcState, MutableMap::class.java) as Map<String, String> else emptyMap()

val updatedMap = updateStateForDebezium2_1(mapAsString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
val isCompressed: Boolean

init {
this.streamName = streamName
this.primaryKey = primaryKey
this.keySequence = keySequence
this.syncCheckpointRecords = syncCheckpointRecords
this.syncCheckpointDuration = syncCheckpointDuration
this.tableName = tableName
this.cursorColumnName = cursorColumnName
this.cursorSqlType = cursorSqlType
this.cause = cause
this.tableSize = tableSize
this.avgRowLength = avgRowLength
this.schema = schema
this.isCompressed = isCompressed
}
Expand Down Expand Up @@ -140,7 +129,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
}
}

private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode?>?>?) {
private fun persist(schemaHistory: SchemaHistory<Optional<JsonNode>>?) {
if (schemaHistory!!.schema!!.isEmpty) {
return
}
Expand Down Expand Up @@ -223,7 +212,7 @@ class AirbyteSchemaHistoryStorage(private val path: Path, private val compressSc
return string.toByteArray(StandardCharsets.UTF_8).size.toDouble() / (ONE_MB)
}

fun initializeDBHistory(schemaHistory: SchemaHistory<Optional<JsonNode?>?>?,
fun initializeDBHistory(schemaHistory: SchemaHistory<Optional<JsonNode>>?,
compressSchemaHistoryForState: Boolean): AirbyteSchemaHistoryStorage {
val dbHistoryWorkingDir: Path
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ abstract class DebeziumPropertiesManager(private val properties: Properties,

fun getDebeziumProperties(
offsetManager: AirbyteFileOffsetBackingStore,
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?>): Properties {
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>): Properties {
val props = Properties()
props.putAll(properties)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
private val publisherStatusSupplier: Supplier<Boolean>,
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
private val firstRecordWaitTime: Duration,
subsequentRecordWaitTime: Duration?) : AbstractIterator<ChangeEventWithMetadata?>(), AutoCloseableIterator<ChangeEventWithMetadata?> {
subsequentRecordWaitTime: Duration?) : AbstractIterator<ChangeEventWithMetadata?>(), AutoCloseableIterator<ChangeEventWithMetadata> {
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> = HashMap(1)
private val subsequentRecordWaitTime: Duration = firstRecordWaitTime.dividedBy(2)

private var receivedFirstRecord = false
private var hasSnapshotFinished = true
private var tsLastHeartbeat: LocalDateTime? = null
private var lastHeartbeatPosition: T = null
private var lastHeartbeatPosition: T? = null
private var maxInstanceOfNoRecordsFound = 0
private var signalledDebeziumEngineShutdown = false

Expand Down Expand Up @@ -108,7 +108,6 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
requestClose("Closing: Change event reached target position")
}
this.tsLastHeartbeat = null
this.lastHeartbeatPosition = null
this.receivedFirstRecord = true
this.maxInstanceOfNoRecordsFound = 0
return changeEventWithMetadata
Expand Down Expand Up @@ -192,7 +191,7 @@ class DebeziumRecordIterator<T>(private val queue: LinkedBlockingQueue<ChangeEve
* reflection to setAccessible for each event
*/
@VisibleForTesting
protected fun getHeartbeatPosition(heartbeatEvent: ChangeEvent<String?, String?>): T? {
internal fun getHeartbeatPosition(heartbeatEvent: ChangeEvent<String?, String?>): T {
try {
val eventClass: Class<out ChangeEvent<*, *>?> = heartbeatEvent.javaClass
val f: Field?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DebeziumRecordPublisher(private val debeziumPropertiesManager: DebeziumPro

fun start(queue: BlockingQueue<ChangeEvent<String?, String?>>,
offsetManager: AirbyteFileOffsetBackingStore,
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage?>) {
schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>) {
engine = DebeziumEngine.create(Json::class.java)
.using(debeziumPropertiesManager.getDebeziumProperties(offsetManager, schemaHistoryManager))
.using(OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
Expand Down
Loading

0 comments on commit f6db47e

Please sign in to comment.