diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java index b1d4c709..9021432e 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBConsistentStreamingSource.java @@ -131,7 +131,7 @@ protected void getChanges2(ChangeEventSourceContext context, YBPartition ybParti if (snapshotter.shouldSnapshot()) { LOGGER.info("Skipping bootstrap because snapshot has been taken so streaming will resume there onwards"); } else { - bootstrapTabletWithRetry(tabletPairList); + bootstrapTabletWithRetry(tabletPairList, tableIdToTable); } // This log while indicate that the connector has either bootstrapped the tablets or skipped diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java index ce4662eb..523092e1 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBOffsetContext.java @@ -259,6 +259,7 @@ public void updateRecordPosition(YBPartition partition, OpId lsn, public void initSourceInfo(YBPartition partition, YugabyteDBConnectorConfig connectorConfig, OpId opId) { this.tabletSourceInfo.put(partition.getId(), new SourceInfo(connectorConfig, opId)); + this.fromLsn.put(partition.getId(), opId); } public Map getTabletSourceInfo() { diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java index f4a7b19b..bb33f309 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBSnapshotChangeEventSource.java @@ -638,7 +638,7 @@ public void markSnapshotDoneOnServer(YBPartition partition, YugabyteDBOffsetCont short retryCount = 0; while (retryCount <= connectorConfig.maxConnectorRetries()) { try { - LOGGER.debug("Marking snapshot completed on service for table {} tablet {}", partition.getTableId(), partition.getTabletId()); + LOGGER.info("Marking snapshot completed on service for table {} tablet {}", partition.getTableId(), partition.getTabletId()); GetChangesResponse response = this.syncClient.getChangesCDCSDK(tableIdToTable.get(partition.getTableId()), connectorConfig.streamId(), partition.getTabletId(), snapshotDoneMarker.getTerm(), diff --git a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java index 2978780e..e2886d5c 100644 --- a/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/yugabytedb/YugabyteDBStreamingChangeEventSource.java @@ -175,28 +175,35 @@ public void execute(ChangeEventSourceContext context, YBPartition partition, Yug } private void bootstrapTablet(YBTable table, String tabletId) throws Exception { - GetCheckpointResponse getCheckpointResponse = this.syncClient.getCheckpoint(table, connectorConfig.streamId(), tabletId); - - long term = getCheckpointResponse.getTerm(); - long index = getCheckpointResponse.getIndex(); - LOGGER.info("Checkpoint for tablet {} before going to bootstrap: {}.{}", tabletId, term, index); - if (term == -1 && index == -1) { - LOGGER.info("Bootstrapping the tablet {}", tabletId); - this.syncClient.bootstrapTablet(table, connectorConfig.streamId(), tabletId, 0, 0, true, true); - } - else { - LOGGER.info("Skipping bootstrap for table {} tablet {} as it has a checkpoint {}.{}", table.getTableId(), tabletId, term, index); - } + LOGGER.info("Bootstrapping the tablet {}", tabletId); + this.syncClient.bootstrapTablet(table, connectorConfig.streamId(), tabletId, 0, 0, true, true); + markNoSnapshotNeeded(table, tabletId); } - protected void bootstrapTabletWithRetry(List> tabletPairList) throws Exception { + protected void bootstrapTabletWithRetry(List> tabletPairList, + Map tableIdToTable) throws Exception { + Set tabletsWithoutBootstrap = new HashSet<>(); + for (Pair entry : tabletPairList) { + GetCheckpointResponse resp = this.syncClient.getCheckpoint(tableIdToTable.get(entry.getKey()), connectorConfig.streamId(), entry.getValue()); + if (resp.getTerm() == -1 && resp.getIndex() == -1) { + LOGGER.debug("Bootstrap required for table {} tablet {} as it has checkpoint -1.-1", entry.getKey(), entry.getValue()); + } else { + LOGGER.info("No bootstrap needed for tablet {} with checkpoint {}.{}", entry.getValue(), resp.getTerm(), resp.getIndex()); + tabletsWithoutBootstrap.add(entry.getValue() /* tabletId */ ); + } + } + short retryCountForBootstrapping = 0; for (Pair entry : tabletPairList) { // entry is a Pair boolean shouldRetry = true; while (retryCountForBootstrapping <= connectorConfig.maxConnectorRetries() && shouldRetry) { try { - bootstrapTablet(this.syncClient.openTableByUUID(entry.getKey()), entry.getValue()); + if (!tabletsWithoutBootstrap.contains(entry.getValue())) { + bootstrapTablet(this.syncClient.openTableByUUID(entry.getKey()), entry.getValue()); + } else { + LOGGER.info("Skipping bootstrap for table {} tablet {} as it has a checkpoint", entry.getKey(), entry.getValue()); + } // Reset the retry flag if the bootstrap was successful shouldRetry = false; @@ -227,6 +234,41 @@ protected void bootstrapTabletWithRetry(List> tabletPairList } } + protected void markNoSnapshotNeeded(YBTable ybTable, String tabletId) throws Exception { + short retryCount = 0; + while (retryCount <= connectorConfig.maxConnectorRetries()) { + try { + LOGGER.info("Marking no snapshot on service for table {} tablet {}", ybTable.getTableId(), tabletId); + GetChangesResponse response = + this.syncClient.getChangesCDCSDK(ybTable, connectorConfig.streamId(), + tabletId, -1, -1, YugabyteDBOffsetContext.SNAPSHOT_DONE_KEY.getBytes(), + 0, 0, false /* schema is not needed since this is a dummy call */); + + // Break upon successful request. + break; + } catch (Exception e) { + ++retryCount; + + if (retryCount > connectorConfig.maxConnectorRetries()) { + LOGGER.error("Too many errors while trying to mark no snapshot on service for table {} tablet {} error: ", + ybTable.getTableId(), tabletId, e); + throw e; + } + + LOGGER.warn("Error while marking no snapshot on service for table {} tablet {}, will attempt retry {} of {} for error {}", + ybTable.getTableId(), tabletId, retryCount, connectorConfig.maxConnectorRetries(), e); + + try { + final Metronome retryMetronome = Metronome.parker(Duration.ofMillis(connectorConfig.connectorRetryDelayMs()), Clock.SYSTEM); + retryMetronome.pause(); + } catch (InterruptedException ie) { + LOGGER.warn("Connector retry sleep interrupted by exception: {}", ie); + Thread.currentThread().interrupt(); + } + } + } + } + protected void getChanges2(ChangeEventSourceContext context, YBPartition partitionn, @@ -311,7 +353,7 @@ protected void getChanges2(ChangeEventSourceContext context, if (snapshotter.shouldSnapshot()) { LOGGER.info("Skipping bootstrap because snapshot has been taken so streaming will resume there onwards"); } else { - bootstrapTabletWithRetry(tabletPairList); + bootstrapTabletWithRetry(tabletPairList, tableIdToTable); } // This log while indicate that the connector has either bootstrapped the tablets or skipped diff --git a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBColocatedTablesTest.java b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBColocatedTablesTest.java index f752eebc..e29408ea 100644 --- a/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBColocatedTablesTest.java +++ b/src/test/java/io/debezium/connector/yugabytedb/YugabyteDBColocatedTablesTest.java @@ -185,6 +185,8 @@ public void shouldWorkAfterAddingTableAfterRestart() throws Exception { // Stop the connector and modify the configuration stopConnector(); + TestHelper.executeBulkWithRange(INSERT_TEST_2, 11, 21, DEFAULT_COLOCATED_DB_NAME); + configBuilder.with(YugabyteDBConnectorConfig.TABLE_INCLUDE_LIST, "public.test_1,public.test_2,public.test_3"); @@ -197,20 +199,20 @@ public void shouldWorkAfterAddingTableAfterRestart() throws Exception { // The below statements will insert records of the respective types with keys in the // range [11,21) TestHelper.executeBulkWithRange(INSERT_TEST_1, 11, 21, DEFAULT_COLOCATED_DB_NAME); - TestHelper.executeBulkWithRange(INSERT_TEST_2, 11, 21, DEFAULT_COLOCATED_DB_NAME); + TestHelper.executeBulkWithRange(INSERT_TEST_2, 21, 101, DEFAULT_COLOCATED_DB_NAME); TestHelper.executeBulkWithRange(INSERT_TEST_3, 11, 21, DEFAULT_COLOCATED_DB_NAME); // Dummy wait for 10 more seconds TestHelper.waitFor(Duration.ofSeconds(10)); - SourceRecords recordsAfterRestart = consumeByTopic(30); + SourceRecords recordsAfterRestart = consumeByTopic(110); assertNotNull(recordsAfterRestart); assertEquals( 10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_1").size()); assertEquals( - 10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_2").size()); + 90, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_2").size()); assertEquals( 10, recordsAfterRestart.recordsForTopic(TestHelper.TEST_SERVER + ".public.test_3").size());