From 0385a64200e9dae3eb304b33c61a24af0c55c5f5 Mon Sep 17 00:00:00 2001 From: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com> Date: Tue, 13 Feb 2024 14:04:23 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20=20Set=20cdc=20record=20subseque?= =?UTF-8?q?nt=20record=20wait=20time=20to=20initial=20wait=20time=20as=20a?= =?UTF-8?q?=20workaround=20(#35114)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source-mongodb-v2/metadata.yaml | 2 +- .../source/mongodb/MongoConnectionUtils.java | 2 +- .../source/mongodb/MongoConstants.java | 2 + .../source/mongodb/MongoDbSourceConfig.java | 45 +++++++++++-------- .../mongodb/cdc/MongoDbCdcInitializer.java | 16 ++++--- .../mongodb/MongoDbSourceConfigTest.java | 19 ++++---- .../source/mongodb/MongoUtilTest.java | 6 +-- docs/integrations/sources/mongodb-v2.md | 3 +- 8 files changed, 55 insertions(+), 40 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 7151f5e74f33..f9729994caf9 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.8 + dockerImageTag: 1.2.9 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java index 40cf79819a5b..7e48aa7468a4 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConnectionUtils.java @@ -50,7 +50,7 @@ public static MongoClient createMongoClient(final MongoDbSourceConfig config) { } private static String buildConnectionString(final MongoDbSourceConfig config) { - return MongoDbDebeziumPropertiesManager.buildConnectionString(config.rawConfig(), true); + return MongoDbDebeziumPropertiesManager.buildConnectionString(config.getDatabaseConfig(), true); } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java index 09f645c94e25..430fa9f9c409 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoConstants.java @@ -34,6 +34,8 @@ public class MongoConstants { public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY; public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY; public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD; + public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds"; + public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300; private MongoConstants() {} diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java index 3591286490b0..a03647bb9386 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfig.java @@ -11,7 +11,9 @@ import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE; import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE; +import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC; import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY; +import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC; import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY; import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY; @@ -27,33 +29,32 @@ */ public record MongoDbSourceConfig(JsonNode rawConfig) { - /** - * Constructs a new {@link MongoDbSourceConfig} from the provided raw configuration. - * - * @param rawConfig The underlying JSON configuration provided by the connector framework. - * @throws IllegalArgumentException if the raw configuration does not contain the - * {@link MongoConstants#DATABASE_CONFIG_CONFIGURATION_KEY} key. - */ - public MongoDbSourceConfig(final JsonNode rawConfig) { - if (rawConfig.has(DATABASE_CONFIG_CONFIGURATION_KEY)) { - this.rawConfig = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY); - } else { + public MongoDbSourceConfig { + if (rawConfig == null) { + throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config."); + } + if (!rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) { throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property."); } } + public JsonNode getDatabaseConfig() { + return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY); + } + public String getAuthSource() { - return rawConfig.has(AUTH_SOURCE_CONFIGURATION_KEY) ? rawConfig.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE) + return getDatabaseConfig().has(AUTH_SOURCE_CONFIGURATION_KEY) ? getDatabaseConfig().get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE) : DEFAULT_AUTH_SOURCE; } public Integer getCheckpointInterval() { - return rawConfig.has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? rawConfig.get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) + return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) + ? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL) : CHECKPOINT_INTERVAL; } public String getDatabaseName() { - return rawConfig.has(DATABASE_CONFIGURATION_KEY) ? rawConfig.get(DATABASE_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(DATABASE_CONFIGURATION_KEY) ? getDatabaseConfig().get(DATABASE_CONFIGURATION_KEY).asText() : null; } public OptionalInt getQueueSize() { @@ -63,15 +64,15 @@ public OptionalInt getQueueSize() { } public String getPassword() { - return rawConfig.has(PASSWORD_CONFIGURATION_KEY) ? rawConfig.get(PASSWORD_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY) ? getDatabaseConfig().get(PASSWORD_CONFIGURATION_KEY).asText() : null; } public String getUsername() { - return rawConfig.has(USERNAME_CONFIGURATION_KEY) ? rawConfig.get(USERNAME_CONFIGURATION_KEY).asText() : null; + return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) ? getDatabaseConfig().get(USERNAME_CONFIGURATION_KEY).asText() : null; } public boolean hasAuthCredentials() { - return rawConfig.has(USERNAME_CONFIGURATION_KEY) && rawConfig.has(PASSWORD_CONFIGURATION_KEY); + return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) && getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY); } public Integer getSampleSize() { @@ -83,8 +84,16 @@ public Integer getSampleSize() { } public boolean getEnforceSchema() { - return rawConfig.has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? rawConfig.get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true) + return getDatabaseConfig().has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? getDatabaseConfig().get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true) : true; } + public Integer getInitialWaitingTimeSeconds() { + if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) { + return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC); + } else { + return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC; + } + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index 4e6078a6b2f3..ab322e37b816 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -9,7 +9,6 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoDatabase; import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler; -import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; @@ -80,8 +79,11 @@ public List> createCdcIterators( final Instant emittedAt, final MongoDbSourceConfig config) { - final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig()); - final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(config.rawConfig()); + final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds()); + // #35059: debezium heartbeats are not sent on the expected interval. this is + // a worksaround to allow making subsequent wait time configurable. + final Duration subsequentRecordWaitTime = firstRecordWaitTime; + LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime); final int queueSize = MongoUtil.getDebeziumEventQueueSize(config); final String databaseName = config.getDatabaseName(); final boolean isEnforceSchema = config.getEnforceSchema(); @@ -97,7 +99,7 @@ public List> createCdcIterators( Jsons.clone(defaultDebeziumProperties), catalog, cdcState.state(), - config.rawConfig(), + config.getDatabaseConfig(), mongoClient); // We should always be able to extract offset out of state if it's not null @@ -131,12 +133,12 @@ public List> createCdcIterators( initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), cdcMetadataInjector, emittedAt, config.getCheckpointInterval(), isEnforceSchema); - final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.rawConfig(), + final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(), new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); - final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.rawConfig(), catalog); - final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.rawConfig()); + final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog); + final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig()); final Supplier> incrementalIteratorSupplier = () -> handler.getIncrementalIterators( propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java index 9e8665cf65d7..3ca4825b8dfd 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbSourceConfigTest.java @@ -39,15 +39,16 @@ void testCreatingMongoDbSourceConfig() { final String username = "username"; final boolean isSchemaEnforced = false; final JsonNode rawConfig = Jsons.jsonNode( - Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( - AUTH_SOURCE_CONFIGURATION_KEY, authSource, - CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval, - DATABASE_CONFIGURATION_KEY, database, + Map.of( DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize, - PASSWORD_CONFIGURATION_KEY, password, QUEUE_SIZE_CONFIGURATION_KEY, queueSize, - USERNAME_CONFIGURATION_KEY, username, - SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced))); + DATABASE_CONFIG_CONFIGURATION_KEY, Map.of( + AUTH_SOURCE_CONFIGURATION_KEY, authSource, + CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval, + DATABASE_CONFIGURATION_KEY, database, + PASSWORD_CONFIGURATION_KEY, password, + USERNAME_CONFIGURATION_KEY, username, + SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced))); final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(rawConfig); assertNotNull(sourceConfig); assertEquals(authSource, sourceConfig.getAuthSource()); @@ -55,7 +56,7 @@ void testCreatingMongoDbSourceConfig() { assertEquals(database, sourceConfig.getDatabaseName()); assertEquals(password, sourceConfig.getPassword()); assertEquals(OptionalInt.of(queueSize), sourceConfig.getQueueSize()); - assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig()); + assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig()); assertEquals(sampleSize, sourceConfig.getSampleSize()); assertEquals(username, sourceConfig.getUsername()); assertEquals(isSchemaEnforced, sourceConfig.getEnforceSchema()); @@ -76,7 +77,7 @@ void testDefaultValues() { assertEquals(null, sourceConfig.getDatabaseName()); assertEquals(null, sourceConfig.getPassword()); assertEquals(OptionalInt.empty(), sourceConfig.getQueueSize()); - assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig()); + assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig()); assertEquals(DEFAULT_DISCOVER_SAMPLE_SIZE, sourceConfig.getSampleSize()); assertEquals(null, sourceConfig.getUsername()); } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java index 4f8388fa4dc3..832c9c7af936 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoUtilTest.java @@ -312,11 +312,11 @@ void testGetAuthorizedCollectionsMongoSecurityException() { void testGetDebeziumEventQueueSize() { final int queueSize = 5000; final MongoDbSourceConfig validQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig tooSmallQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig tooLargeQueueSizeConfiguration = new MongoDbSourceConfig( - Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE)))); + Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); final MongoDbSourceConfig missingQueueSizeConfiguration = new MongoDbSourceConfig(Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of()))); diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index 530a1544c1a3..72873c05fc0b 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| -| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | +| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes | +| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 | | 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. | | 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. | | 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |