From c116d3321ad3c4781001494e843bd4540e1adba0 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 16 Jan 2024 17:04:54 -0800 Subject: [PATCH 1/7] [Source-mongo] : Relax minimum document discovery size to 100 --- .../source-mongodb-v2/integration_tests/expected_spec.json | 2 +- airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml | 2 +- .../connectors/source-mongodb-v2/src/main/resources/spec.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json index 54e3d7aa189c..8de6fd3bc30e 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 1000, + "minimum": 100, "maximum": 100000, "group": "advanced" } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index fb5be28de49d..086852451f42 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.1 + dockerImageTag: 1.2.2 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json index ae6b822110ca..4acd5c67d25f 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 1000, + "minimum": 100, "maximum": 100000, "group": "advanced" } From 06d5a0e32e66902c1eea014e111be577b1d527b2 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 16 Jan 2024 17:07:03 -0800 Subject: [PATCH 2/7] changelog --- docs/integrations/sources/mongodb-v2.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index bc9ffb1820e2..c7821542174b 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. | | 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. | | 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag | | 1.1.0 | 2023-12-14 | [32328](https://github.com/airbytehq/airbyte/pull/32328) | Schema less mode in mongodb. | From 8d8ba8b3af84ac2d0edd0baa7425492552fc0be1 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 18 Jan 2024 13:50:37 -0800 Subject: [PATCH 3/7] [Source-mongodb-v2] : Additional logging of resume token start --- .../internals/mongodb/MongoDbDebeziumStateUtil.java | 13 ++++++++----- .../connectors/source-mongodb-v2/build.gradle | 2 +- .../integration_tests/expected_spec.json | 2 +- .../source/mongodb/cdc/MongoDbCdcInitializer.java | 10 +++++----- .../source-mongodb-v2/src/main/resources/spec.json | 2 +- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java index 1bceb20940e6..dcbcc96a783e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java @@ -56,7 +56,8 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil { */ public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) { final String replicaSet = getReplicaSetName(mongoClient); - LOGGER.info("Initial resume token '{}' constructed", ResumeTokens.getData(resumeToken).asString().getValue()); + LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}", + ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime()); final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue()); LOGGER.info("Initial Debezium state constructed: {}", state); return state; @@ -113,12 +114,14 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoCli final ChangeStreamIterable stream = mongoClient.watch(BsonDocument.class); stream.resumeAfter(savedOffset); try (final var ignored = stream.cursor()) { - LOGGER.info("Valid resume token '{}' present. Incremental sync will be performed for up-to-date streams.", - ResumeTokens.getData(savedOffset).asString().getValue()); + LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for " + + "up-to-date streams.", + ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime()); return true; } catch (final MongoCommandException | MongoChangeStreamException e) { - LOGGER.info("Invalid resume token '{}' present. Initial snapshot will be performed for all streams.", - ResumeTokens.getData(savedOffset).asString().getValue()); + LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for " + + "all streams.", + ResumeTokens.getData(savedOffset).asString().getValue(),ResumeTokens.getTimestamp(savedOffset).getTime()); return false; } } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index 0c079534e552..fe98dce77cad 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -7,7 +7,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.7.9' features = ['db-sources'] - useLocalCdk = false + useLocalCdk = true } airbyteJavaConnector.addCdkDependencies() diff --git a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json index 8de6fd3bc30e..e77e9d632780 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/integration_tests/expected_spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 100, + "minimum": 10, "maximum": 100000, "group": "advanced" } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java index ce979bc13353..bc8957c98179 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/cdc/MongoDbCdcInitializer.java @@ -92,9 +92,9 @@ public List> createCdcIterators( final boolean isEnforceSchema = config.getEnforceSchema(); final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties(); logOplogInfo(mongoClient); - final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient); + final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient); final JsonNode initialDebeziumState = - mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeToken, mongoClient, databaseName); + mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName); final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null) ? new MongoDbCdcState(initialDebeziumState, isEnforceSchema) : new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced()); @@ -137,7 +137,7 @@ public List> createCdcIterators( emittedAt, config.getCheckpointInterval(), isEnforceSchema); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(config.rawConfig(), - new MongoDbCdcTargetPosition(resumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize); + new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize); final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager); final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed); @@ -164,8 +164,8 @@ private void logOplogInfo(final MongoClient mongoClient) { final Document command = new Document("collStats", "oplog.rs"); final Document result = localDatabase.runCommand(command); if (result != null) { - LOGGER.info("Max oplog size is {} bytes", result.getInteger("maxSize")); - LOGGER.info("Free space in oplog is {} bytes", result.getInteger("freeStorageSize")); + LOGGER.info("Max oplog size is {} bytes", result.getLong("maxSize")); + LOGGER.info("Free space in oplog is {} bytes", result.getLong("freeStorageSize")); } } catch (final Exception e) { LOGGER.warn("Unable to query for op log stats, exception: {}" + e.getMessage()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json index 4acd5c67d25f..9c4af4f046c9 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/resources/spec.json @@ -164,7 +164,7 @@ "description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.", "default": 10000, "order": 10, - "minimum": 100, + "minimum": 10, "maximum": 100000, "group": "advanced" } From fb08c827008e1bbb2086b94627aae7ac3cdac2aa Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 18 Jan 2024 14:33:45 -0800 Subject: [PATCH 4/7] Formatting --- .../internals/mongodb/MongoDbDebeziumStateUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java index dcbcc96a783e..6e646d53e7c0 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/java/io/airbyte/cdk/integrations/debezium/internals/mongodb/MongoDbDebeziumStateUtil.java @@ -115,13 +115,13 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoCli stream.resumeAfter(savedOffset); try (final var ignored = stream.cursor()) { LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for " - + "up-to-date streams.", + + "up-to-date streams.", ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime()); return true; } catch (final MongoCommandException | MongoChangeStreamException e) { LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for " - + "all streams.", - ResumeTokens.getData(savedOffset).asString().getValue(),ResumeTokens.getTimestamp(savedOffset).getTime()); + + "all streams.", + ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime()); return false; } } From 28ea0ff8f19335ce5edbc88a67c1e2c52665f5de Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 18 Jan 2024 14:43:05 -0800 Subject: [PATCH 5/7] bump version.prop --- airbyte-cdk/java/airbyte-cdk/README.md | 1 + .../java/airbyte-cdk/core/src/main/resources/version.properties | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 47539a8cc70f..077fb7b472cb 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,6 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.13.1 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | | 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | | 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | | 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index f6cee2374148..aa31273d9042 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.0 +version=0.13.1 From ce94886bc0460b6ffc8321eb867002772e1f12af Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 18 Jan 2024 15:43:49 -0800 Subject: [PATCH 6/7] Bump versions + docs --- airbyte-cdk/java/airbyte-cdk/README.md | 2 +- .../airbyte-cdk/core/src/main/resources/version.properties | 2 +- .../connectors/source-mongodb-v2/build.gradle | 4 ++-- docs/integrations/sources/mongodb-v2.md | 1 + 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index 077fb7b472cb..2e2f12bd429d 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -166,7 +166,7 @@ MavenLocal debugging steps: | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 0.13.1 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | +| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector | | 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes | | 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method | | 0.12.0 | 2024-01-10 | [\#33875](https://github.com/airbytehq/airbyte/pull/33875) | Upgrade sshd-mina to 2.11.1 | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index aa31273d9042..b18dfa7feb69 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.13.1 +version=0.13.2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle index fe98dce77cad..ed08fabcc683 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/build.gradle +++ b/airbyte-integrations/connectors/source-mongodb-v2/build.gradle @@ -5,9 +5,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.7.9' + cdkVersionRequired = '0.13.2' features = ['db-sources'] - useLocalCdk = true + useLocalCdk = false } airbyteJavaConnector.addCdkDependencies() diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index c7821542174b..4ff1a9a2f0fb 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.2.3 | 2024-01-18 | [34364](https://github.com/airbytehq/airbyte/pull/34364) | Add additional logging for resume token + reduce discovery size to 10. | | 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. | | 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. | | 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag | From 4918b28807ccdc92a3720a3b264571a8ab5252a9 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Thu, 18 Jan 2024 16:17:24 -0800 Subject: [PATCH 7/7] Bump more version --- airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 086852451f42..3bba900cd503 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.2 + dockerImageTag: 1.2.3 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2