Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-mongodb-v2] : Adding logs for resume token timestamps + reducing min document discovery size further #34364

Merged
merged 10 commits into from
Jan 19, 2024
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.13.2 | 2024-01-18 | [\#34364](https://github.com/airbytehq/airbyte/pull/34364) | Better logging in mongo db source connector |
| 0.13.1 | 2024-01-18 | [\#34236](https://github.com/airbytehq/airbyte/pull/34236) | Add postCreateTable hook in destination JdbcSqlGenerator |
| 0.13.0 | 2024-01-16 | [\#34177](https://github.com/airbytehq/airbyte/pull/34177) | Add `useExpensiveSafeCasting` param in JdbcSqlGenerator methods; add JdbcTypingDedupingTest fixture; other DV2-related changes |
| 0.12.1 | 2024-01-11 | [\#34186](https://github.com/airbytehq/airbyte/pull/34186) | Add hook for additional destination specific checks to JDBC destination check method |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.13.1
version=0.13.2
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class MongoDbDebeziumStateUtil implements DebeziumStateUtil {
*/
public JsonNode constructInitialDebeziumState(final BsonDocument resumeToken, final MongoClient mongoClient, final String serverId) {
final String replicaSet = getReplicaSetName(mongoClient);
LOGGER.info("Initial resume token '{}' constructed", ResumeTokens.getData(resumeToken).asString().getValue());
LOGGER.info("Initial resume token '{}' constructed, corresponding to timestamp (seconds after epoch) {}",
ResumeTokens.getData(resumeToken).asString().getValue(), ResumeTokens.getTimestamp(resumeToken).getTime());
final JsonNode state = formatState(serverId, replicaSet, ((BsonString) ResumeTokens.getData(resumeToken)).getValue());
LOGGER.info("Initial Debezium state constructed: {}", state);
return state;
Expand Down Expand Up @@ -113,12 +114,14 @@ public boolean isValidResumeToken(final BsonDocument savedOffset, final MongoCli
final ChangeStreamIterable<BsonDocument> stream = mongoClient.watch(BsonDocument.class);
stream.resumeAfter(savedOffset);
try (final var ignored = stream.cursor()) {
LOGGER.info("Valid resume token '{}' present. Incremental sync will be performed for up-to-date streams.",
ResumeTokens.getData(savedOffset).asString().getValue());
LOGGER.info("Valid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Incremental sync will be performed for "
+ "up-to-date streams.",
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
return true;
} catch (final MongoCommandException | MongoChangeStreamException e) {
LOGGER.info("Invalid resume token '{}' present. Initial snapshot will be performed for all streams.",
ResumeTokens.getData(savedOffset).asString().getValue());
LOGGER.info("Invalid resume token '{}' present, corresponding to timestamp (seconds after epoch) : {}. Initial snapshot will be performed for "
+ "all streams.",
ResumeTokens.getData(savedOffset).asString().getValue(), ResumeTokens.getTimestamp(savedOffset).getTime());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.9'
cdkVersionRequired = '0.13.2'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.",
"default": 10000,
"order": 10,
"minimum": 100,
"minimum": 10,
"maximum": 100000,
"group": "advanced"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.2
dockerImageTag: 1.2.3
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final boolean isEnforceSchema = config.getEnforceSchema();
final Properties defaultDebeziumProperties = MongoDbCdcProperties.getDebeziumProperties();
logOplogInfo(mongoClient);
final BsonDocument resumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient);
final BsonDocument initialResumeToken = MongoDbResumeTokenHelper.getMostRecentResumeToken(mongoClient);
final JsonNode initialDebeziumState =
mongoDbDebeziumStateUtil.constructInitialDebeziumState(resumeToken, mongoClient, databaseName);
mongoDbDebeziumStateUtil.constructInitialDebeziumState(initialResumeToken, mongoClient, databaseName);
final MongoDbCdcState cdcState = (stateManager.getCdcState() == null || stateManager.getCdcState().state() == null)
? new MongoDbCdcState(initialDebeziumState, isEnforceSchema)
: new MongoDbCdcState(Jsons.clone(stateManager.getCdcState().state()), stateManager.getCdcState().schema_enforced());
Expand Down Expand Up @@ -137,7 +137,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
emittedAt, config.getCheckpointInterval(), isEnforceSchema);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.rawConfig(),
new MongoDbCdcTargetPosition(resumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);

Expand All @@ -164,8 +164,8 @@ private void logOplogInfo(final MongoClient mongoClient) {
final Document command = new Document("collStats", "oplog.rs");
final Document result = localDatabase.runCommand(command);
if (result != null) {
LOGGER.info("Max oplog size is {} bytes", result.getInteger("maxSize"));
LOGGER.info("Free space in oplog is {} bytes", result.getInteger("freeStorageSize"));
LOGGER.info("Max oplog size is {} bytes", result.getLong("maxSize"));
LOGGER.info("Free space in oplog is {} bytes", result.getLong("freeStorageSize"));
}
} catch (final Exception e) {
LOGGER.warn("Unable to query for op log stats, exception: {}" + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"description": "The maximum number of documents to sample when attempting to discover the unique fields for a collection.",
"default": 10000,
"order": 10,
"minimum": 100,
"minimum": 10,
"maximum": 100000,
"group": "advanced"
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.3 | 2024-01-18 | [34364](https://github.com/airbytehq/airbyte/pull/34364) | Add additional logging for resume token + reduce discovery size to 10. |
| 1.2.2 | 2024-01-16 | [34314](https://github.com/airbytehq/airbyte/pull/34314) | Reduce minimum document discovery size to 100. |
| 1.2.1 | 2023-12-18 | [33549](https://github.com/airbytehq/airbyte/pull/33549) | Add logging to understand op log size. |
| 1.2.0 | 2023-12-18 | [33438](https://github.com/airbytehq/airbyte/pull/33438) | Remove LEGACY state flag |
Expand Down
Loading