Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Set cdc record subsequent record wait time to initial wait time as a workaround #35114

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class MongoConstants {
public static final String USERNAME_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.USERNAME_CONFIGURATION_KEY;
public static final String SCHEMA_ENFORCED_CONFIGURATION_KEY = MongoDbDebeziumConstants.Configuration.SCHEMA_ENFORCED_CONFIGURATION_KEY;
public static final String SCHEMALESS_MODE_DATA_FIELD = Configuration.SCHEMALESS_MODE_DATA_FIELD;
public static final String INITIAL_RECORD_WAITING_TIME_SEC = "initial_waiting_seconds";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not define initial waiting time in the spec already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For mongo cdc, I think we'll always be assured that initial wait time in seconds is populated, so we can remove the defaults

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not define initial waiting time in the spec already?
I'm not sure I get it
Value is defined in spec but to read it I needed to define a constant in java.
The other place reading it - incorrectly for mongodb - is RecordWaitTimeUtil.getFirstRecordWaitSeconds()
but no constant defined there

final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack - that is unfortunate

public static final Integer DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC = 300;

private MongoConstants() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import static io.airbyte.integrations.source.mongodb.MongoConstants.DATABASE_CONFIG_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_AUTH_SOURCE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_DISCOVER_SAMPLE_SIZE;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.DISCOVER_SAMPLE_SIZE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.INITIAL_RECORD_WAITING_TIME_SEC;
import static io.airbyte.integrations.source.mongodb.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.SCHEMA_ENFORCED_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.MongoConstants.USERNAME_CONFIGURATION_KEY;
Expand Down Expand Up @@ -87,4 +89,12 @@ public boolean getEnforceSchema() {
: true;
}

public Integer getInitialWaitingTimeSeconds() {
if (rawConfig.has(INITIAL_RECORD_WAITING_TIME_SEC)) {
return rawConfig.get(INITIAL_RECORD_WAITING_TIME_SEC).asInt(DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC);
} else {
return DEFAULT_INITIAL_RECORD_WAITING_TIME_SEC;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoDatabase;
import io.airbyte.cdk.integrations.debezium.AirbyteDebeziumHandler;
import io.airbyte.cdk.integrations.debezium.internals.RecordWaitTimeUtil;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -80,8 +79,14 @@ public List<AutoCloseableIterator<AirbyteMessage>> createCdcIterators(
final Instant emittedAt,
final MongoDbSourceConfig config) {

final Duration firstRecordWaitTime = RecordWaitTimeUtil.getFirstRecordWaitTime(config.rawConfig());
final Duration subsequentRecordWaitTime = RecordWaitTimeUtil.getSubsequentRecordWaitTime(config.rawConfig());
// LOGGER.info("*** config {}", config.rawConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : remove this commented out log line

final Duration firstRecordWaitTime = Duration.ofSeconds(config.getInitialWaitingTimeSeconds());
// #35059: debezium heartbeats are not sent on the expected interval. this is a worksaround to allow
// making
// subsequent wait time configurable.
// final Duration subsequentRecordWaitTime = firstRecordWaitTime.dividedBy(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the goal to make the subsequent record wait time half of the first record wait time or the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.
Changed back to 1/2

final Duration subsequentRecordWaitTime = firstRecordWaitTime;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a max of 10 minutes may not be enough.
Customer is still seeing 101 records and timeout with 10 minutes.

LOGGER.info("*** subs {} init {}", subsequentRecordWaitTime, firstRecordWaitTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a log line here like : "Subsequent record wait time is : {}" to make it more clear?

final int queueSize = MongoUtil.getDebeziumEventQueueSize(config);
final String databaseName = config.getDatabaseName();
final boolean isEnforceSchema = config.getEnforceSchema();
Expand Down
Loading