Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[yugabyte/yugabyte-db#18075] Send snapshot_done_key before starting streaming #239

Merged
merged 3 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti
if (snapshotter.shouldSnapshot()) {
LOGGER.info("Skipping bootstrap because snapshot has been taken so streaming will resume there onwards");
} else {
bootstrapTabletWithRetry(tabletPairList);
bootstrapTabletWithRetry(tabletPairList, tableIdToTable);
}

// This log while indicate that the connector has either bootstrapped the tablets or skipped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public void updateRecordPosition(YBPartition partition, OpId lsn,

public void initSourceInfo(YBPartition partition, YugabyteDBConnectorConfig connectorConfig, OpId opId) {
this.tabletSourceInfo.put(partition.getId(), new SourceInfo(connectorConfig, opId));
this.fromLsn.put(partition.getId(), opId);
}

public Map<String, SourceInfo> getTabletSourceInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ public void markSnapshotDoneOnServer(YBPartition partition, YugabyteDBOffsetCont
short retryCount = 0;
while (retryCount <= connectorConfig.maxConnectorRetries()) {
try {
LOGGER.debug("Marking snapshot completed on service for table {} tablet {}", partition.getTableId(), partition.getTabletId());
LOGGER.info("Marking snapshot completed on service for table {} tablet {}", partition.getTableId(), partition.getTabletId());
GetChangesResponse response =
this.syncClient.getChangesCDCSDK(tableIdToTable.get(partition.getTableId()), connectorConfig.streamId(),
partition.getTabletId(), snapshotDoneMarker.getTerm(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,35 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug
}

private void bootstrapTablet(YBTable table, String tabletId) throws Exception {
GetCheckpointResponse getCheckpointResponse = this.syncClient.getCheckpoint(table, connectorConfig.streamId(), tabletId);

long term = getCheckpointResponse.getTerm();
long index = getCheckpointResponse.getIndex();
LOGGER.info("Checkpoint for tablet {} before going to bootstrap: {}.{}", tabletId, term, index);
if (term == -1 && index == -1) {
LOGGER.info("Bootstrapping the tablet {}", tabletId);
this.syncClient.bootstrapTablet(table, connectorConfig.streamId(), tabletId, 0, 0, true, true);
}
else {
LOGGER.info("Skipping bootstrap for table {} tablet {} as it has a checkpoint {}.{}", table.getTableId(), tabletId, term, index);
}
LOGGER.info("Bootstrapping the tablet {}", tabletId);
this.syncClient.bootstrapTablet(table, connectorConfig.streamId(), tabletId, 0, 0, true, true);
markNoSnapshotNeeded(table, tabletId);
}

protected void bootstrapTabletWithRetry(List<Pair<String,String>> tabletPairList) throws Exception {
protected void bootstrapTabletWithRetry(List<Pair<String,String>> tabletPairList,
Map<String, YBTable> tableIdToTable) throws Exception {
Set<String> tabletsWithoutBootstrap = new HashSet<>();
for (Pair<String, String> entry : tabletPairList) {
GetCheckpointResponse resp = this.syncClient.getCheckpoint(tableIdToTable.get(entry.getKey()), connectorConfig.streamId(), entry.getValue());
if (resp.getTerm() == -1 && resp.getIndex() == -1) {
LOGGER.debug("Bootstrap required for table {} tablet {} as it has checkpoint -1.-1", entry.getKey(), entry.getValue());
} else {
LOGGER.info("No bootstrap needed for tablet {} with checkpoint {}.{}", entry.getValue(), resp.getTerm(), resp.getIndex());
tabletsWithoutBootstrap.add(entry.getValue() /* tabletId */ );
}
}

short retryCountForBootstrapping = 0;
for (Pair<String, String> entry : tabletPairList) {
// entry is a Pair<tableId, tabletId>
boolean shouldRetry = true;
while (retryCountForBootstrapping <= connectorConfig.maxConnectorRetries() && shouldRetry) {
try {
bootstrapTablet(this.syncClient.openTableByUUID(entry.getKey()), entry.getValue());
if (!tabletsWithoutBootstrap.contains(entry.getValue())) {
bootstrapTablet(this.syncClient.openTableByUUID(entry.getKey()), entry.getValue());
} else {
LOGGER.info("Skipping bootstrap for table {} tablet {} as it has a checkpoint", entry.getKey(), entry.getValue());
}

// Reset the retry flag if the bootstrap was successful
shouldRetry = false;
Expand Down Expand Up @@ -227,6 +234,41 @@ protected void bootstrapTabletWithRetry(List<Pair<String,String>> tabletPairList
}
}

protected void markNoSnapshotNeeded(YBTable ybTable, String tabletId) throws Exception {
short retryCount = 0;
while (retryCount <= connectorConfig.maxConnectorRetries()) {
try {
LOGGER.info("Marking no snapshot on service for table {} tablet {}", ybTable.getTableId(), tabletId);
GetChangesResponse response =
this.syncClient.getChangesCDCSDK(ybTable, connectorConfig.streamId(),
tabletId, -1, -1, YugabyteDBOffsetContext.SNAPSHOT_DONE_KEY.getBytes(),
0, 0, false /* schema is not needed since this is a dummy call */);

// Break upon successful request.
break;
} catch (Exception e) {
++retryCount;

if (retryCount > connectorConfig.maxConnectorRetries()) {
LOGGER.error("Too many errors while trying to mark no snapshot on service for table {} tablet {} error: ",
ybTable.getTableId(), tabletId, e);
throw e;
}

LOGGER.warn("Error while marking no snapshot on service for table {} tablet {}, will attempt retry {} of {} for error {}",
ybTable.getTableId(), tabletId, retryCount, connectorConfig.maxConnectorRetries(), e);

try {
final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM);
retryMetronome.pause();
} catch (InterruptedException ie) {
LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie);
Thread.currentThread().interrupt();
}
}
}
}


protected void getChanges2(ChangeEventSourceContext context,
YBPartition partitionn,
Expand Down Expand Up @@ -311,7 +353,7 @@ protected void getChanges2(ChangeEventSourceContext context,
if (snapshotter.shouldSnapshot()) {
LOGGER.info("Skipping bootstrap because snapshot has been taken so streaming will resume there onwards");
} else {
bootstrapTabletWithRetry(tabletPairList);
bootstrapTabletWithRetry(tabletPairList, tableIdToTable);
}

// This log while indicate that the connector has either bootstrapped the tablets or skipped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ public void shouldWorkAfterAddingTableAfterRestart() throws Exception {
// Stop the connector and modify the configuration
stopConnector();

TestHelper.executeBulkWithRange(INSERT_TEST_2, 11, 21, DEFAULT_COLOCATED_DB_NAME);

configBuilder.with(YugabyteDBConnectorConfig.TABLE_INCLUDE_LIST,
"public.test_1,public.test_2,public.test_3");

Expand All @@ -197,20 +199,20 @@ public void shouldWorkAfterAddingTableAfterRestart() throws Exception {
// The below statements will insert records of the respective types with keys in the
// range [11,21)
TestHelper.executeBulkWithRange(INSERT_TEST_1, 11, 21, DEFAULT_COLOCATED_DB_NAME);
TestHelper.executeBulkWithRange(INSERT_TEST_2, 11, 21, DEFAULT_COLOCATED_DB_NAME);
TestHelper.executeBulkWithRange(INSERT_TEST_2, 21, 101, DEFAULT_COLOCATED_DB_NAME);
TestHelper.executeBulkWithRange(INSERT_TEST_3, 11, 21, DEFAULT_COLOCATED_DB_NAME);

// Dummy wait for 10 more seconds
TestHelper.waitFor(Duration.ofSeconds(10));

SourceRecords recordsAfterRestart = consumeByTopic(30);
SourceRecords recordsAfterRestart = consumeByTopic(110);

assertNotNull(recordsAfterRestart);

assertEquals(
10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_1").size());
assertEquals(
10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_2").size());
90, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_2").size());
assertEquals(
10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_3").size());

Expand Down