Skip to content

Commit

Permalink
[Source-mongodb-v2] : Adding logs for resume token timestamps + reduc…
Browse files Browse the repository at this point in the history
…ing min document discovery size further (airbytehq#34364)
  • Loading branch information
akashkulk authored and jatinyadav-cc committed Feb 26, 2024
1 parent 7cec13d commit 9e42e80
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 15 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector |
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.1
version=0.13.2
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil {
*/
public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) {
final String replicaSet = getReplicaSetName(mongoClient);
LOGGER.info("Initial resume token '{}' constructed", ResumeTokens.getData(resumeToken).asString().getValue());
LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}",
ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime());
final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue());
LOGGER.info("Initial Debezium state constructed: {}", state);
return state;
Expand Down Expand Up @@ -113,12 +114,14 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoCli
final ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
stream.resumeAfter(savedOffset);
try (final var ignored = stream.cursor()) {
LOGGER.info("Valid resume token '{}' present. Incremental sync will be performed for up-to-date streams.",
ResumeTokens.getData(savedOffset).asString().getValue());
LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for "
+ "up-to-date streams.",
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
return true;
} catch (final MongoCommandException | MongoChangeStreamException e) {
LOGGER.info("Invalid resume token '{}' present. Initial snapshot will be performed for all streams.",
ResumeTokens.getData(savedOffset).asString().getValue());
LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for "
+ "all streams.",
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.9'
cdkVersionRequired = '0.13.2'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.",
"default": 10000,
"order": 10,
"minimum": 100,
"minimum": 10,
"maximum": 100000,
"group": "advanced"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.2
dockerImageTag: 1.2.3
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final boolean isEnforceSchema = config.getEnforceSchema();
final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties();
logOplogInfo(mongoClient);
final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient);
final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeToken, mongoClient, databaseName);
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName);
final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
Expand Down Expand Up @@ -137,7 +137,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
emittedAt, config.getCheckpointInterval(), isEnforceSchema);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.rawConfig(),
new MongoDbCdcTargetPosition(resumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);

Expand All @@ -164,8 +164,8 @@ private void logOplogInfo(final MongoClient mongoClient) {
final Document command = new Document("collStats", "oplog.rs");
final Document result = localDatabase.runCommand(command);
if (result != null) {
LOGGER.info("Max oplog size is {} bytes", result.getInteger("maxSize"));
LOGGER.info("Free space in oplog is {} bytes", result.getInteger("freeStorageSize"));
LOGGER.info("Max oplog size is {} bytes", result.getLong("maxSize"));
LOGGER.info("Free space in oplog is {} bytes", result.getLong("freeStorageSize"));
}
} catch (final Exception e) {
LOGGER.warn("Unable to query for op log stats, exception: {}" + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.",
"default": 10000,
"order": 10,
"minimum": 100,
"minimum": 10,
"maximum": 100000,
"group": "advanced"
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.3 | 2024-01-18 | [34364](https://github.com/airbytehq/airbyte/pull/34364) | Add additional logging for resume token + reduce discovery size to 10. |
| 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. |
| 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. |
| 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag |
Expand Down

0 comments on commit 9e42e80

Please sign in to comment.