From 24b428f9e88971011c9ae1b6c80db067102371f9 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Fri, 9 Feb 2024 13:37:23 -0800 Subject: [PATCH 01/11] set subsequent record wait time to 1/2 of initial wait time as a workaround --- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 4e6078a6b2f3..6788b1e83f44 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -81,7 +81,9 @@ public List> createCdcIterators( final MongoDbSourceConfig config) { final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig()); - final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(config.rawConfig()); + // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow making + // subsequent wait time configurable. + final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); final boolean isEnforceSchema = config.getEnforceSchema(); From ab037a91b62c8038752e25fd0c7a21a049038952 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Sun, 11 Feb 2024 21:52:05 -0800 Subject: [PATCH 02/11] initial_waiting_seconds not read correctly from spec --- .../integrations/source/mongodb/MongoConstants.java | 2 ++ .../source/mongodb/MongoDbSourceConfig.java | 10 ++++++++++ .../source/mongodb/cdc/MongoDbCdcInitializer.java | 3 ++- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java index 09f645c94e25..430fa9f9c409 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java @@ -34,6 +34,8 @@ public class MongoConstants { public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY; public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY; public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD; + public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds"; + public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300; private MongoConstants() {} diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java index 3591286490b0..9cb5852bf700 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java @@ -11,7 +11,9 @@ import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE; import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE; +import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC; import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY; +import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC; import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY; @@ -87,4 +89,12 @@ public boolean getEnforceSchema() { : true; } + public Integer getInitialWaitingTimeSeconds() { + if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) { + return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC); + } else { + return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC; + } + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 6788b1e83f44..00527d6f56b1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -80,10 +80,11 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { - final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig()); + final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow making // subsequent wait time configurable. final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); + LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); final boolean isEnforceSchema = config.getEnforceSchema(); From d9dbb836d9fcebbe6d2fa77c5fbc42aa70043f61 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Sun, 11 Feb 2024 23:40:58 -0800 Subject: [PATCH 03/11] temp --- .../connectors/source-mongodb-v2/metadata.yaml | 2 +- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 14ba912480e3..7151f5e74f33 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.7 + dockerImageTag: 1.2.8 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 00527d6f56b1..d8e3e977427b 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -80,10 +80,12 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { + LOGGER.info("*** config {}", config.rawConfig()); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow making // subsequent wait time configurable. - final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); +// final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); + final Duration subsequentRecordWaitTime = firstRecordWaitTime; LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); From 330ff55dd189974e16d9b35335cff44488989e70 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 12 Feb 2024 00:08:04 -0800 Subject: [PATCH 04/11] temp --- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index d8e3e977427b..721b4752be82 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -9,7 +9,6 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoDatabase; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; -import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -82,9 +81,10 @@ public List> createCdcIterators( LOGGER.info("*** config {}", config.rawConfig()); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); - // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow making + // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow + // making // subsequent wait time configurable. -// final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); + // final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); final Duration subsequentRecordWaitTime = firstRecordWaitTime; LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); From d88c53b7224e6b55a5e4075be2fb5a04d673b652 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 12 Feb 2024 00:22:02 -0800 Subject: [PATCH 05/11] temp --- .../integrations/source/mongodb/cdc/MongoDbCdcInitializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 721b4752be82..58827f3c0aa9 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -79,7 +79,7 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { - LOGGER.info("*** config {}", config.rawConfig()); +// LOGGER.info("*** config {}", config.rawConfig()); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow // making From 150c912d991bb5089f7b5f03662952afb4a42c41 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 12 Feb 2024 22:03:45 -0800 Subject: [PATCH 06/11] temp --- .../source/mongodb/MongoConnectionUtils.java | 2 +- .../source/mongodb/MongoDbSourceConfig.java | 35 +++++++++---------- .../mongodb/cdc/MongoDbCdcInitializer.java | 11 +++--- 3 files changed, 22 insertions(+), 26 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java index 40cf79819a5b..7e48aa7468a4 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java @@ -50,7 +50,7 @@ public static MongoClient createMongoClient(final MongoDbSourceConfig config) { } private static String buildConnectionString(final MongoDbSourceConfig config) { - return MongoDbDebeziumPropertiesManager.buildConnectionString(config.rawConfig(), true); + return MongoDbDebeziumPropertiesManager.buildConnectionString(config.getDatabaseConfig(), true); } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java index 9cb5852bf700..e09e2ea6c662 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java @@ -28,34 +28,31 @@ * @param rawConfig The underlying JSON configuration provided by the connector framework. */ public record MongoDbSourceConfig(JsonNode rawConfig) { - - /** - * Constructs a new {@link MongoDbSourceConfig} from the provided raw configuration. - * - * @param rawConfig The underlying JSON configuration provided by the connector framework. - * @throws IllegalArgumentException if the raw configuration does not contain the - * {@link MongoConstants#DATABASE_CONFIG_CONFIGURATION_KEY} key. - */ - public MongoDbSourceConfig(final JsonNode rawConfig) { - if (rawConfig.has(DATABASE_CONFIG_CONFIGURATION_KEY)) { - this.rawConfig = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY); - } else { + public MongoDbSourceConfig { + if (rawConfig == null) { + throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config."); + } + if ( !rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) { throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property."); } } + public JsonNode getDatabaseConfig() { + return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY); + } + public String getAuthSource() { - return rawConfig.has(AUTH_SOURCE_CONFIGURATION_KEY) ? rawConfig.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE) + return getDatabaseConfig().has(AUTH_SOURCE_CONFIGURATION_KEY) ? getDatabaseConfig().get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE) : DEFAULT_AUTH_SOURCE; } public Integer getCheckpointInterval() { - return rawConfig.has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? rawConfig.get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) + return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) : CHECKPOINT_INTERVAL; } public String getDatabaseName() { - return rawConfig.has(DATABASE_CONFIGURATION_KEY) ? rawConfig.get(DATABASE_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(DATABASE_CONFIGURATION_KEY) ? getDatabaseConfig().get(DATABASE_CONFIGURATION_KEY).asText() : null; } public OptionalInt getQueueSize() { @@ -65,15 +62,15 @@ public OptionalInt getQueueSize() { } public String getPassword() { - return rawConfig.has(PASSWORD_CONFIGURATION_KEY) ? rawConfig.get(PASSWORD_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY) ? getDatabaseConfig().get(PASSWORD_CONFIGURATION_KEY).asText() : null; } public String getUsername() { - return rawConfig.has(USERNAME_CONFIGURATION_KEY) ? rawConfig.get(USERNAME_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) ? getDatabaseConfig().get(USERNAME_CONFIGURATION_KEY).asText() : null; } public boolean hasAuthCredentials() { - return rawConfig.has(USERNAME_CONFIGURATION_KEY) && rawConfig.has(PASSWORD_CONFIGURATION_KEY); + return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) && getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY); } public Integer getSampleSize() { @@ -85,7 +82,7 @@ public Integer getSampleSize() { } public boolean getEnforceSchema() { - return rawConfig.has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? rawConfig.get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true) + return getDatabaseConfig().has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? getDatabaseConfig().get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true) : true; } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 58827f3c0aa9..af9c2008e32c 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -84,8 +84,7 @@ public List> createCdcIterators( // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow // making // subsequent wait time configurable. - // final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); - final Duration subsequentRecordWaitTime = firstRecordWaitTime; + final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); @@ -102,7 +101,7 @@ public List> createCdcIterators( Jsons.clone(defaultDebeziumProperties), catalog, cdcState.state(), - config.rawConfig(), + config.getDatabaseConfig(), mongoClient); // We should always be able to extract offset out of state if it's not null @@ -136,12 +135,12 @@ public List> createCdcIterators( initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), cdcMetadataInjector, emittedAt, config.getCheckpointInterval(), isEnforceSchema); - final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.rawConfig(), + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(), new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); - final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.rawConfig(), catalog); - final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.rawConfig()); + final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog); + final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig()); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler); From df82289f0d96965b2a7d0424aa61a176b9944bf9 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 12 Feb 2024 22:06:14 -0800 Subject: [PATCH 07/11] temp --- .../integrations/source/mongodb/MongoDbSourceConfig.java | 6 ++++-- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java index e09e2ea6c662..a03647bb9386 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java @@ -28,11 +28,12 @@ * @param rawConfig The underlying JSON configuration provided by the connector framework. */ public record MongoDbSourceConfig(JsonNode rawConfig) { + public MongoDbSourceConfig { if (rawConfig == null) { throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config."); } - if ( !rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) { + if (!rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) { throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property."); } } @@ -47,7 +48,8 @@ public String getAuthSource() { } public Integer getCheckpointInterval() { - return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) + return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) + ? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) : CHECKPOINT_INTERVAL; } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index af9c2008e32c..2fe9aed7fdb1 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -79,12 +79,12 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { -// LOGGER.info("*** config {}", config.rawConfig()); + // LOGGER.info("*** config {}", config.rawConfig()); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow // making // subsequent wait time configurable. - final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); + final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); From 912561847c23fd3de57348cce0a550f42fec2295 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Mon, 12 Feb 2024 23:03:55 -0800 Subject: [PATCH 08/11] fix tests --- .../source/mongodb/MongoDbSourceConfigTest.java | 11 ++++++----- .../integrations/source/mongodb/MongoUtilTest.java | 6 +++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java index 9e8665cf65d7..846fa0a71c57 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java @@ -39,13 +39,14 @@ void testCreatingMongoDbSourceConfig() { final String username = "username"; final boolean isSchemaEnforced = false; final JsonNode rawConfig = Jsons.jsonNode( - Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( + Map.of( + DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize, + QUEUE_SIZE_CONFIGURATION_KEY, queueSize, + DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( AUTH_SOURCE_CONFIGURATION_KEY, authSource, CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval, DATABASE_CONFIGURATION_KEY, database, - DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize, PASSWORD_CONFIGURATION_KEY, password, - QUEUE_SIZE_CONFIGURATION_KEY, queueSize, USERNAME_CONFIGURATION_KEY, username, SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced))); final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(rawConfig); @@ -55,7 +56,7 @@ void testCreatingMongoDbSourceConfig() { assertEquals(database, sourceConfig.getDatabaseName()); assertEquals(password, sourceConfig.getPassword()); assertEquals(OptionalInt.of(queueSize), sourceConfig.getQueueSize()); - assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig()); + assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig()); assertEquals(sampleSize, sourceConfig.getSampleSize()); assertEquals(username, sourceConfig.getUsername()); assertEquals(isSchemaEnforced, sourceConfig.getEnforceSchema()); @@ -76,7 +77,7 @@ void testDefaultValues() { assertEquals(null, sourceConfig.getDatabaseName()); assertEquals(null, sourceConfig.getPassword()); assertEquals(OptionalInt.empty(), sourceConfig.getQueueSize()); - assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig()); + assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig()); assertEquals(DEFAULT_DISCOVER_SAMPLE_SIZE, sourceConfig.getSampleSize()); assertEquals(null, sourceConfig.getUsername()); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java index 4f8388fa4dc3..832c9c7af936 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java @@ -312,11 +312,11 @@ void testGetAuthorizedCollectionsMongoSecurityException() { void testGetDebeziumEventQueueSize() { final int queueSize = 5000; final MongoDbSourceConfig validQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig tooSmallQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig tooLargeQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig missingQueueSizeConfiguration = new MongoDbSourceConfig(Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); From c17f3c84dc2933f31ed251d73e14fa98efac4f89 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:38:46 -0800 Subject: [PATCH 09/11] sanity --- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 8 +++----- .../source/mongodb/MongoDbSourceConfigTest.java | 12 ++++++------ 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 2fe9aed7fdb1..62b618f2a356 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -79,13 +79,11 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { - // LOGGER.info("*** config {}", config.rawConfig()); final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); - // #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow - // making - // subsequent wait time configurable. + // #35059: debezium heartbeats are not sent on the expected interval. this is + // a worksaround to allow making subsequent wait time configurable. final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); - LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime); + LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); final boolean isEnforceSchema = config.getEnforceSchema(); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java index 846fa0a71c57..3ca4825b8dfd 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java @@ -43,12 +43,12 @@ void testCreatingMongoDbSourceConfig() { DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize, QUEUE_SIZE_CONFIGURATION_KEY, queueSize, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( - AUTH_SOURCE_CONFIGURATION_KEY, authSource, - CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval, - DATABASE_CONFIGURATION_KEY, database, - PASSWORD_CONFIGURATION_KEY, password, - USERNAME_CONFIGURATION_KEY, username, - SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced))); + AUTH_SOURCE_CONFIGURATION_KEY, authSource, + CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval, + DATABASE_CONFIGURATION_KEY, database, + PASSWORD_CONFIGURATION_KEY, password, + USERNAME_CONFIGURATION_KEY, username, + SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced))); final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(rawConfig); assertNotNull(sourceConfig); assertEquals(authSource, sourceConfig.getAuthSource()); From cf2fe44e81b9c2b1809b438d3045c04f3300b393 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Tue, 13 Feb 2024 09:52:43 -0800 Subject: [PATCH 10/11] make subsequent wait time equal to initial --- .../integrations/source/mongodb/cdc/MongoDbCdcInitializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 62b618f2a356..ab322e37b816 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -82,7 +82,7 @@ public List> createCdcIterators( final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); // #35059: debezium heartbeats are not sent on the expected interval. this is // a worksaround to allow making subsequent wait time configurable. - final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2); + final Duration subsequentRecordWaitTime = firstRecordWaitTime; LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); From 268604bebf390e9b418b2cc1f3147b23a9101d63 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Tue, 13 Feb 2024 13:12:21 -0800 Subject: [PATCH 11/11] bump version --- .../connectors/source-mongodb-v2/metadata.yaml | 2 +- docs/integrations/sources/mongodb-v2.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 7151f5e74f33..f9729994caf9 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.8 + dockerImageTag: 1.2.9 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 530a1544c1a3..72873c05fc0b 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| -| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | +| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes | +| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | | 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. | | 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. | | 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |