Skip to content

Commit

Permalink
convert CDK db-sources submodules to kotlin (airbytehq#36438)
Browse files Browse the repository at this point in the history
convert CDK db-sources submodules to kotlin

fix compiler warnings
  • Loading branch information
stephane-airbyte authored and nurikk-sa committed Apr 4, 2024
1 parent 430ad73 commit ca4f115
Show file tree
Hide file tree
Showing 190 changed files with 16,433 additions and 13,625 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object DSLContextFactory {
driverClassName: String,
jdbcConnectionString: String?,
dialect: SQLDialect?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DSLContext {
return DSL.using(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DataSourceFactory {
password: String?,
driverClassName: String,
jdbcConnectionString: String?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
Expand Down Expand Up @@ -100,7 +100,7 @@ object DataSourceFactory {
port: Int,
database: String?,
driverClassName: String,
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, host, port, database)
.withConnectionProperties(connectionProperties)
Expand Down Expand Up @@ -152,7 +152,7 @@ object DataSourceFactory {
private var password: String?,
private var driverClassName: String
) {
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
private var connectionProperties: Map<String, String> = java.util.Map.of()
private var database: String? = null
private var host: String? = null
private var jdbcUrl: String? = null
Expand Down Expand Up @@ -185,7 +185,7 @@ object DataSourceFactory {
}

fun withConnectionProperties(
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSourceBuilder {
if (connectionProperties != null) {
this.connectionProperties = connectionProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory

/** Implementation of source operations with standard JDBC types. */
class JdbcSourceOperations :
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
return try {
JDBCType.valueOf(columnTypeInt)
Expand Down Expand Up @@ -80,12 +80,12 @@ class JdbcSourceOperations :
JDBCType.TINYINT,
JDBCType.SMALLINT -> setShortInt(preparedStatement, parameterIndex, value!!)
JDBCType.INTEGER -> setInteger(preparedStatement, parameterIndex, value!!)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value!!)
JDBCType.FLOAT,
JDBCType.DOUBLE -> setDouble(preparedStatement, parameterIndex, value!!)
JDBCType.REAL -> setReal(preparedStatement, parameterIndex, value!!)
JDBCType.NUMERIC,
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value)
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value!!)
JDBCType.CHAR,
JDBCType.NCHAR,
JDBCType.NVARCHAR,
Expand Down Expand Up @@ -147,7 +147,7 @@ class JdbcSourceOperations :
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
}

override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
return when (jdbcType) {
JDBCType.BIT,
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*

abstract class JdbcConnector
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
return getConnectionTimeout(connectionProperties, driverClassName)
}

Expand All @@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
fun getConnectionTimeout(
connectionProperties: Map<String?, String?>,
connectionProperties: Map<String, String>,
driverClassName: String?
): Duration {
val parsedConnectionTimeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage>

Expand All @@ -65,7 +65,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return List.of(read(config, catalog, state))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
return source.read(config, catalog, state)
Expand All @@ -44,7 +44,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return source.readStreams(config, catalog, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand All @@ -97,7 +97,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.10
version=0.28.11
6 changes: 6 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ java {
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false


// Convert yaml to java: relationaldb.models
jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
Expand Down Expand Up @@ -53,4 +58,5 @@ dependencies {
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))

testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
}
Original file line number Diff line number Diff line change
@@ -1,149 +1,3 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium;

import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
public class AirbyteDebeziumHandler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class);
/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
public static final int QUEUE_CAPACITY = 10_000;

private final JsonNode config;
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime, subsequentRecordWaitTime;
private final int queueSize;
private final boolean addDbNameToOffsetState;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final int queueSize,
final boolean addDbNameToOffsetState) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.subsequentRecordWaitTime = subsequentRecordWaitTime;
this.queueSize = queueSize;
this.addDbNameToOffsetState = addDbNameToOffsetState;
}

class CapacityReportingBlockingQueue<E> extends LinkedBlockingQueue<E> {

private static Duration REPORT_DURATION = Duration.of(10, ChronoUnit.SECONDS);
private Instant lastReport;

CapacityReportingBlockingQueue(final int capacity) {
super(capacity);
}

private void reportQueueUtilization() {
if (lastReport == null || Duration.between(lastReport, Instant.now()).compareTo(REPORT_DURATION) > 0) {
LOGGER.info("CDC events queue size: {}. remaining {}", this.size(), this.remainingCapacity());
synchronized (this) {
lastReport = Instant.now();
}
}
}

@Override
public void put(final E e) throws InterruptedException {
reportQueueUtilization();
super.put(e);
}

@Override
public E poll() {
reportQueueUtilization();
return super.poll();
}

}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager,
final DebeziumEventConverter eventConverter,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler) {
LOGGER.info("Using CDC: {}", true);
LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();
final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue, offsetManager, schemaHistoryManager);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
publisher::hasClosed,
new DebeziumShutdownProcedure<>(queue, publisher::close, publisher::hasClosed),
firstRecordWaitTime,
subsequentRecordWaitTime);

final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY)
? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;

DebeziumMessageProducer messageProducer = new DebeziumMessageProducer(cdcStateHandler,
targetPosition,
eventConverter,
offsetManager,
schemaHistoryManager);

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
// at all thus we will pass in null.
SourceStateIterator iterator =
new SourceStateIterator<>(eventIterator, null, messageProducer, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration));
return AutoCloseableIterators.fromIterator(iterator);
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}

}
Original file line number Diff line number Diff line change
@@ -1,56 +1,3 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* This interface is used to add metadata to the records fetched from the database. For instance, in
* Postgres we add the lsn to the records. In MySql we add the file name and position to the
* records.
*/
public interface CdcMetadataInjector<T> {

/**
* A debezium record contains multiple pieces. Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
*
* @param event is the actual record which contains data and would be written to the destination
* @param source contains the metadata about the record and we need to extract that metadata and add
* it to the event before writing it to destination
*/
void addMetaData(ObjectNode event, JsonNode source);

// TODO : Remove this - it is deprecated.
default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, final String transactionTimestamp, final T metadataToAdd) {
throw new RuntimeException("Not Supported");
}

default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record) {
throw new RuntimeException("Not Supported");
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return the stream namespace extracted from the change event source.
*/
String namespace(JsonNode source);

/**
* As part of Airbyte record we need to add the name (e.g. table name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return The stream name extracted from the change event source.
*/
String name(JsonNode source);

}
Loading

0 comments on commit ca4f115

Please sign in to comment.