Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert CDK db-sources submodules to kotlin #36438

Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object DSLContextFactory {
driverClassName: String,
jdbcConnectionString: String?,
dialect: SQLDialect?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DSLContext {
return DSL.using(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object DataSourceFactory {
password: String?,
driverClassName: String,
jdbcConnectionString: String?,
connectionProperties: Map<String?, String?>?,
connectionProperties: Map<String, String>?,
connectionTimeout: Duration?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, jdbcConnectionString)
Expand Down Expand Up @@ -100,7 +100,7 @@ object DataSourceFactory {
port: Int,
database: String?,
driverClassName: String,
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSource {
return DataSourceBuilder(username, password, driverClassName, host, port, database)
.withConnectionProperties(connectionProperties)
Expand Down Expand Up @@ -152,7 +152,7 @@ object DataSourceFactory {
private var password: String?,
private var driverClassName: String
) {
private var connectionProperties: Map<String?, String?> = java.util.Map.of()
private var connectionProperties: Map<String, String> = java.util.Map.of()
private var database: String? = null
private var host: String? = null
private var jdbcUrl: String? = null
Expand Down Expand Up @@ -185,7 +185,7 @@ object DataSourceFactory {
}

fun withConnectionProperties(
connectionProperties: Map<String?, String?>?
connectionProperties: Map<String, String>?
): DataSourceBuilder {
if (connectionProperties != null) {
this.connectionProperties = connectionProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory

/** Implementation of source operations with standard JDBC types. */
class JdbcSourceOperations :
AbstractJdbcCompatibleSourceOperations<JDBCType?>(), SourceOperations<ResultSet, JDBCType?> {
AbstractJdbcCompatibleSourceOperations<JDBCType>(), SourceOperations<ResultSet, JDBCType> {
protected fun safeGetJdbcType(columnTypeInt: Int): JDBCType {
return try {
JDBCType.valueOf(columnTypeInt)
Expand Down Expand Up @@ -80,12 +80,12 @@ class JdbcSourceOperations :
JDBCType.TINYINT,
JDBCType.SMALLINT -> setShortInt(preparedStatement, parameterIndex, value!!)
JDBCType.INTEGER -> setInteger(preparedStatement, parameterIndex, value!!)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value)
JDBCType.BIGINT -> setBigInteger(preparedStatement, parameterIndex, value!!)
JDBCType.FLOAT,
JDBCType.DOUBLE -> setDouble(preparedStatement, parameterIndex, value!!)
JDBCType.REAL -> setReal(preparedStatement, parameterIndex, value!!)
JDBCType.NUMERIC,
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value)
JDBCType.DECIMAL -> setDecimal(preparedStatement, parameterIndex, value!!)
JDBCType.CHAR,
JDBCType.NCHAR,
JDBCType.NVARCHAR,
Expand Down Expand Up @@ -147,7 +147,7 @@ class JdbcSourceOperations :
return JdbcUtils.ALLOWED_CURSOR_TYPES.contains(type)
}

override fun getAirbyteType(jdbcType: JDBCType?): JsonSchemaType {
override fun getAirbyteType(jdbcType: JDBCType): JsonSchemaType {
return when (jdbcType) {
JDBCType.BIT,
JDBCType.BOOLEAN -> JsonSchemaType.BOOLEAN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.*

abstract class JdbcConnector
protected constructor(@JvmField protected val driverClassName: String) : BaseConnector() {
protected fun getConnectionTimeout(connectionProperties: Map<String?, String?>): Duration {
protected fun getConnectionTimeout(connectionProperties: Map<String, String>): Duration {
return getConnectionTimeout(connectionProperties, driverClassName)
}

Expand All @@ -37,7 +37,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
fun getConnectionTimeout(
connectionProperties: Map<String?, String?>,
connectionProperties: Map<String, String>,
driverClassName: String?
): Duration {
val parsedConnectionTimeout =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage>

Expand All @@ -65,7 +65,7 @@ interface Source : Integration {
@Throws(Exception::class)
fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return List.of(read(config, catalog, state))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
return source.read(config, catalog, state)
Expand All @@ -44,7 +44,7 @@ abstract class SpecModifyingSource(private val source: Source) : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
return source.readStreams(config, catalog, state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun read(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): AutoCloseableIterator<AirbyteMessage> {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand All @@ -97,7 +97,7 @@ class SshWrappedSource : Source {
@Throws(Exception::class)
override fun readStreams(
config: JsonNode,
catalog: ConfiguredAirbyteCatalog?,
catalog: ConfiguredAirbyteCatalog,
state: JsonNode?
): Collection<AutoCloseableIterator<AirbyteMessage>>? {
val tunnel: SshTunnel = SshTunnel.Companion.getInstance(config, hostKey, portKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.10
version=0.28.11
6 changes: 6 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/db-sources/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ java {
}
}

compileKotlin.compilerOptions.allWarningsAsErrors = false
compileTestFixturesKotlin.compilerOptions.allWarningsAsErrors = false
compileTestKotlin.compilerOptions.allWarningsAsErrors = false


// Convert yaml to java: relationaldb.models
jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
Expand Down Expand Up @@ -53,4 +58,5 @@ dependencies {
testImplementation testFixtures(project(':airbyte-cdk:java:airbyte-cdk:datastore-postgres'))

testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1'
testImplementation 'org.mockito.kotlin:mockito-kotlin:5.2.1'
}
Original file line number Diff line number Diff line change
@@ -1,149 +1,3 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium;

import static io.airbyte.cdk.integrations.debezium.DebeziumIteratorConstants.*;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.debezium.internals.*;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class acts as the bridge between Airbyte DB connectors and debezium. If a DB connector wants
* to use debezium for CDC, it should use this class
*/
public class AirbyteDebeziumHandler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(AirbyteDebeziumHandler.class);
/**
* We use 10000 as capacity cause the default queue size and batch size of debezium is :
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_BATCH_SIZE}is 2048
* {@link io.debezium.config.CommonConnectorConfig#DEFAULT_MAX_QUEUE_SIZE} is 8192
*/
public static final int QUEUE_CAPACITY = 10_000;

private final JsonNode config;
private final CdcTargetPosition<T> targetPosition;
private final boolean trackSchemaHistory;
private final Duration firstRecordWaitTime, subsequentRecordWaitTime;
private final int queueSize;
private final boolean addDbNameToOffsetState;

public AirbyteDebeziumHandler(final JsonNode config,
final CdcTargetPosition<T> targetPosition,
final boolean trackSchemaHistory,
final Duration firstRecordWaitTime,
final Duration subsequentRecordWaitTime,
final int queueSize,
final boolean addDbNameToOffsetState) {
this.config = config;
this.targetPosition = targetPosition;
this.trackSchemaHistory = trackSchemaHistory;
this.firstRecordWaitTime = firstRecordWaitTime;
this.subsequentRecordWaitTime = subsequentRecordWaitTime;
this.queueSize = queueSize;
this.addDbNameToOffsetState = addDbNameToOffsetState;
}

class CapacityReportingBlockingQueue<E> extends LinkedBlockingQueue<E> {

private static Duration REPORT_DURATION = Duration.of(10, ChronoUnit.SECONDS);
private Instant lastReport;

CapacityReportingBlockingQueue(final int capacity) {
super(capacity);
}

private void reportQueueUtilization() {
if (lastReport == null || Duration.between(lastReport, Instant.now()).compareTo(REPORT_DURATION) > 0) {
LOGGER.info("CDC events queue size: {}. remaining {}", this.size(), this.remainingCapacity());
synchronized (this) {
lastReport = Instant.now();
}
}
}

@Override
public void put(final E e) throws InterruptedException {
reportQueueUtilization();
super.put(e);
}

@Override
public E poll() {
reportQueueUtilization();
return super.poll();
}

}

public AutoCloseableIterator<AirbyteMessage> getIncrementalIterators(final DebeziumPropertiesManager debeziumPropertiesManager,
final DebeziumEventConverter eventConverter,
final CdcSavedInfoFetcher cdcSavedInfoFetcher,
final CdcStateHandler cdcStateHandler) {
LOGGER.info("Using CDC: {}", true);
LOGGER.info("Using DBZ version: {}", DebeziumEngine.class.getPackage().getImplementationVersion());
final AirbyteFileOffsetBackingStore offsetManager = AirbyteFileOffsetBackingStore.initializeState(
cdcSavedInfoFetcher.getSavedOffset(),
addDbNameToOffsetState ? Optional.ofNullable(config.get(JdbcUtils.DATABASE_KEY).asText()) : Optional.empty());
final var schemaHistoryManager = trackSchemaHistory
? Optional.of(AirbyteSchemaHistoryStorage.initializeDBHistory(
cdcSavedInfoFetcher.getSavedSchemaHistory(), cdcStateHandler.compressSchemaHistoryForState()))
: Optional.<AirbyteSchemaHistoryStorage>empty();
final var publisher = new DebeziumRecordPublisher(debeziumPropertiesManager);
final var queue = new CapacityReportingBlockingQueue<ChangeEvent<String, String>>(queueSize);
publisher.start(queue, offsetManager, schemaHistoryManager);
// handle state machine around pub/sub logic.
final AutoCloseableIterator<ChangeEventWithMetadata> eventIterator = new DebeziumRecordIterator<>(
queue,
targetPosition,
publisher::hasClosed,
new DebeziumShutdownProcedure<>(queue, publisher::close, publisher::hasClosed),
firstRecordWaitTime,
subsequentRecordWaitTime);

final Duration syncCheckpointDuration = config.has(SYNC_CHECKPOINT_DURATION_PROPERTY)
? Duration.ofSeconds(config.get(SYNC_CHECKPOINT_DURATION_PROPERTY).asLong())
: SYNC_CHECKPOINT_DURATION;
final Long syncCheckpointRecords = config.has(SYNC_CHECKPOINT_RECORDS_PROPERTY)
? config.get(SYNC_CHECKPOINT_RECORDS_PROPERTY).asLong()
: SYNC_CHECKPOINT_RECORDS;

DebeziumMessageProducer messageProducer = new DebeziumMessageProducer(cdcStateHandler,
targetPosition,
eventConverter,
offsetManager,
schemaHistoryManager);

// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is not used
// at all thus we will pass in null.
SourceStateIterator iterator =
new SourceStateIterator<>(eventIterator, null, messageProducer, new StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration));
return AutoCloseableIterators.fromIterator(iterator);
}

public static boolean isAnyStreamIncrementalSyncMode(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode)
.anyMatch(syncMode -> syncMode == SyncMode.INCREMENTAL);
}

}
Original file line number Diff line number Diff line change
@@ -1,56 +1,3 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.integrations.debezium;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* This interface is used to add metadata to the records fetched from the database. For instance, in
* Postgres we add the lsn to the records. In MySql we add the file name and position to the
* records.
*/
public interface CdcMetadataInjector<T> {

/**
* A debezium record contains multiple pieces. Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
*
* @param event is the actual record which contains data and would be written to the destination
* @param source contains the metadata about the record and we need to extract that metadata and add
* it to the event before writing it to destination
*/
void addMetaData(ObjectNode event, JsonNode source);

// TODO : Remove this - it is deprecated.
default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record, final String transactionTimestamp, final T metadataToAdd) {
throw new RuntimeException("Not Supported");
}

default void addMetaDataToRowsFetchedOutsideDebezium(final ObjectNode record) {
throw new RuntimeException("Not Supported");
}

/**
* As part of Airbyte record we need to add the namespace (schema name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return the stream namespace extracted from the change event source.
*/
String namespace(JsonNode source);

/**
* As part of Airbyte record we need to add the name (e.g. table name)
*
* @param source part of debezium record and contains the metadata about the record. We need to
* extract namespace out of this metadata and return Ref :
* https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-create-events
* @return The stream name extracted from the change event source.
*/
String name(JsonNode source);

}
Loading
Loading