Skip to content

Commit

Permalink
🐛 Set cdc record subsequent record wait time to initial wait time as …
Browse files Browse the repository at this point in the history
…a workaround (#35114)
  • Loading branch information
rodireich authored Feb 13, 2024
1 parent edcd5ed commit 0385a64
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.2.8
dockerImageTag: 1.2.9
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static MongoClient createMongoClient(final MongoDbSourceConfig config) {
}

private static String buildConnectionString(final MongoDbSourceConfig config) {
return MongoDbDebeziumPropertiesManager.buildConnectionString(config.rawConfig(), true);
return MongoDbDebeziumPropertiesManager.buildConnectionString(config.getDatabaseConfig(), true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class MongoConstants {
public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY;
public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY;
public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD;
public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds";
public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300;

private MongoConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;
Expand All @@ -27,33 +29,32 @@
*/
public record MongoDbSourceConfig(JsonNode rawConfig) {

/**
* Constructs a new {@link MongoDbSourceConfig} from the provided raw configuration.
*
* @param rawConfig The underlying JSON configuration provided by the connector framework.
* @throws IllegalArgumentException if the raw configuration does not contain the
* {@link MongoConstants#DATABASE_CONFIG_CONFIGURATION_KEY} key.
*/
public MongoDbSourceConfig(final JsonNode rawConfig) {
if (rawConfig.has(DATABASE_CONFIG_CONFIGURATION_KEY)) {
this.rawConfig = rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
} else {
public MongoDbSourceConfig {
if (rawConfig == null) {
throw new IllegalArgumentException("MongoDbSourceConfig cannot accept a null config.");
}
if (!rawConfig.hasNonNull(DATABASE_CONFIG_CONFIGURATION_KEY)) {
throw new IllegalArgumentException("Database configuration is missing required '" + DATABASE_CONFIG_CONFIGURATION_KEY + "' property.");
}
}

public JsonNode getDatabaseConfig() {
return rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY);
}

public String getAuthSource() {
return rawConfig.has(AUTH_SOURCE_CONFIGURATION_KEY) ? rawConfig.get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
return getDatabaseConfig().has(AUTH_SOURCE_CONFIGURATION_KEY) ? getDatabaseConfig().get(AUTH_SOURCE_CONFIGURATION_KEY).asText(DEFAULT_AUTH_SOURCE)
: DEFAULT_AUTH_SOURCE;
}

public Integer getCheckpointInterval() {
return rawConfig.has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY) ? rawConfig.get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
return getDatabaseConfig().has(CHECKPOINT_INTERVAL_CONFIGURATION_KEY)
? getDatabaseConfig().get(CHECKPOINT_INTERVAL_CONFIGURATION_KEY).asInt(CHECKPOINT_INTERVAL)
: CHECKPOINT_INTERVAL;
}

public String getDatabaseName() {
return rawConfig.has(DATABASE_CONFIGURATION_KEY) ? rawConfig.get(DATABASE_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(DATABASE_CONFIGURATION_KEY) ? getDatabaseConfig().get(DATABASE_CONFIGURATION_KEY).asText() : null;
}

public OptionalInt getQueueSize() {
Expand All @@ -63,15 +64,15 @@ public OptionalInt getQueueSize() {
}

public String getPassword() {
return rawConfig.has(PASSWORD_CONFIGURATION_KEY) ? rawConfig.get(PASSWORD_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY) ? getDatabaseConfig().get(PASSWORD_CONFIGURATION_KEY).asText() : null;
}

public String getUsername() {
return rawConfig.has(USERNAME_CONFIGURATION_KEY) ? rawConfig.get(USERNAME_CONFIGURATION_KEY).asText() : null;
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) ? getDatabaseConfig().get(USERNAME_CONFIGURATION_KEY).asText() : null;
}

public boolean hasAuthCredentials() {
return rawConfig.has(USERNAME_CONFIGURATION_KEY) && rawConfig.has(PASSWORD_CONFIGURATION_KEY);
return getDatabaseConfig().has(USERNAME_CONFIGURATION_KEY) && getDatabaseConfig().has(PASSWORD_CONFIGURATION_KEY);
}

public Integer getSampleSize() {
Expand All @@ -83,8 +84,16 @@ public Integer getSampleSize() {
}

public boolean getEnforceSchema() {
return rawConfig.has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? rawConfig.get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
return getDatabaseConfig().has(SCHEMA_ENFORCED_CONFIGURATION_KEY) ? getDatabaseConfig().get(SCHEMA_ENFORCED_CONFIGURATION_KEY).asBoolean(true)
: true;
}

public Integer getInitialWaitingTimeSeconds() {
if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) {
return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC);
} else {
return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -80,8 +79,11 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final Instant emittedAt,
final MongoDbSourceConfig config) {

final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig());
final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(config.rawConfig());
final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds());
// #35059: debezium heartbeats are not sent on the expected interval. this is
// a worksaround to allow making subsequent wait time configurable.
final Duration subsequentRecordWaitTime = firstRecordWaitTime;
LOGGER.info("Subsequent cdc record wait time: {} seconds", subsequentRecordWaitTime);
final int queueSize = MongoUtil.getDebeziumEventQueueSize(config);
final String databaseName = config.getDatabaseName();
final boolean isEnforceSchema = config.getEnforceSchema();
Expand All @@ -97,7 +99,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
Jsons.clone(defaultDebeziumProperties),
catalog,
cdcState.state(),
config.rawConfig(),
config.getDatabaseConfig(),
mongoClient);

// We should always be able to extract offset out of state if it's not null
Expand Down Expand Up @@ -131,12 +133,12 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
initialSnapshotHandler.getIterators(initialSnapshotStreams, stateManager, mongoClient.getDatabase(databaseName), cdcMetadataInjector,
emittedAt, config.getCheckpointInterval(), isEnforceSchema);

final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.rawConfig(),
final AirbyteDebeziumHandler<BsonTimestamp> handler = new AirbyteDebeziumHandler<>(config.getDatabaseConfig(),
new MongoDbCdcTargetPosition(initialResumeToken), false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize, false);
final MongoDbCdcStateHandler mongoDbCdcStateHandler = new MongoDbCdcStateHandler(stateManager);
final MongoDbCdcSavedInfoFetcher cdcSavedInfoFetcher = new MongoDbCdcSavedInfoFetcher(stateToBeUsed);
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.rawConfig(), catalog);
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.rawConfig());
final var propertiesManager = new MongoDbDebeziumPropertiesManager(defaultDebeziumProperties, config.getDatabaseConfig(), catalog);
final var eventConverter = new MongoDbDebeziumEventConverter(cdcMetadataInjector, catalog, emittedAt, config.getDatabaseConfig());

final Supplier<AutoCloseableIterator<AirbyteMessage>> incrementalIteratorSupplier = () -> handler.getIncrementalIterators(
propertiesManager, eventConverter, cdcSavedInfoFetcher, mongoDbCdcStateHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,24 @@ void testCreatingMongoDbSourceConfig() {
final String username = "username";
final boolean isSchemaEnforced = false;
final JsonNode rawConfig = Jsons.jsonNode(
Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(
AUTH_SOURCE_CONFIGURATION_KEY, authSource,
CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval,
DATABASE_CONFIGURATION_KEY, database,
Map.of(
DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY, sampleSize,
PASSWORD_CONFIGURATION_KEY, password,
QUEUE_SIZE_CONFIGURATION_KEY, queueSize,
USERNAME_CONFIGURATION_KEY, username,
SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced)));
DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(
AUTH_SOURCE_CONFIGURATION_KEY, authSource,
CHECKPOINT_INTERVAL_CONFIGURATION_KEY, checkpointInterval,
DATABASE_CONFIGURATION_KEY, database,
PASSWORD_CONFIGURATION_KEY, password,
USERNAME_CONFIGURATION_KEY, username,
SCHEMA_ENFORCED_CONFIGURATION_KEY, isSchemaEnforced)));
final MongoDbSourceConfig sourceConfig = new MongoDbSourceConfig(rawConfig);
assertNotNull(sourceConfig);
assertEquals(authSource, sourceConfig.getAuthSource());
assertEquals(checkpointInterval, sourceConfig.getCheckpointInterval());
assertEquals(database, sourceConfig.getDatabaseName());
assertEquals(password, sourceConfig.getPassword());
assertEquals(OptionalInt.of(queueSize), sourceConfig.getQueueSize());
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig());
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig());
assertEquals(sampleSize, sourceConfig.getSampleSize());
assertEquals(username, sourceConfig.getUsername());
assertEquals(isSchemaEnforced, sourceConfig.getEnforceSchema());
Expand All @@ -76,7 +77,7 @@ void testDefaultValues() {
assertEquals(null, sourceConfig.getDatabaseName());
assertEquals(null, sourceConfig.getPassword());
assertEquals(OptionalInt.empty(), sourceConfig.getQueueSize());
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.rawConfig());
assertEquals(rawConfig.get(DATABASE_CONFIG_CONFIGURATION_KEY), sourceConfig.getDatabaseConfig());
assertEquals(DEFAULT_DISCOVER_SAMPLE_SIZE, sourceConfig.getSampleSize());
assertEquals(null, sourceConfig.getUsername());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,11 @@ void testGetAuthorizedCollectionsMongoSecurityException() {
void testGetDebeziumEventQueueSize() {
final int queueSize = 5000;
final MongoDbSourceConfig validQueueSizeConfiguration = new MongoDbSourceConfig(
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize))));
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, queueSize, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
final MongoDbSourceConfig tooSmallQueueSizeConfiguration = new MongoDbSourceConfig(
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE))));
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MIN_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
final MongoDbSourceConfig tooLargeQueueSizeConfiguration = new MongoDbSourceConfig(
Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE))));
Jsons.jsonNode(Map.of(MongoConstants.QUEUE_SIZE_CONFIGURATION_KEY, Integer.MAX_VALUE, DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));
final MongoDbSourceConfig missingQueueSizeConfiguration =
new MongoDbSourceConfig(Jsons.jsonNode(Map.of(DATABASE_CONFIG_CONFIGURATION_KEY, Map.of())));

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
| 1.2.9 | 2024-02-13 | [35114](https://github.com/airbytehq/airbyte/pull/35114) | Extend subsequent cdc record wait time to the duration of initial. Bug Fixes |
| 1.2.8 | 2024-02-08 | [34748](https://github.com/airbytehq/airbyte/pull/34748) | Adopt CDK 0.19.0 |
| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. |
| 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. |
| 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . |
Expand Down

0 comments on commit 0385a64

Please sign in to comment.