Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ else if (!isRemainingTablesCheckpointed && !isSnapshotAssigningFinished(assigner

private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled()
&& !sourceConfig.getStartupOptions().isSnapshotOnly()) {
&& !sourceConfig.getStartupOptions().isSnapshotOnly()
&& AssignerStatus.isAssigningFinished(assignerStatus)) {
try {
// check whether we got newly added tables
final List<TableId> currentCapturedTables =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ else if (!isRemainingTablesCheckpointed
private void captureNewlyAddedTables() {
// Don't scan newly added table in snapshot mode.
if (sourceConfig.isScanNewlyAddedTableEnabled()
&& !sourceConfig.getStartupOptions().isSnapshotOnly()) {
&& !sourceConfig.getStartupOptions().isSnapshotOnly()
&& AssignerStatus.isAssigningFinished(assignerStatus)) {
// check whether we got newly added tables
try (JdbcConnection jdbc = DebeziumUtils.openJdbcConnection(sourceConfig)) {
final List<TableId> currentCapturedTables =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,15 +484,32 @@ public void testEnumerateTablesLazily() {
}

@Test
public void testScanNewlyAddedTableStartFromCheckpoint() {
public void testScanNewlyAddedTableStartFromInitialAssigningFinishedCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
"customers_even_dist null [10]",
"customers_even_dist [10] [18]",
"customers_even_dist [18] null",
"customer_card_single_line null null");
assertEquals(expected, getTestAssignSnapshotSplitsFromCheckpoint());
assertEquals(
expected,
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.INITIAL_ASSIGNING_FINISHED));
}

@Test
public void testScanNewlyAddedTableStartFromNewlyAddedAssigningSnapshotFinishedCheckpoint() {
List<String> expected =
Arrays.asList(
"customers_sparse_dist [109] null",
"customers_even_dist null [10]",
"customers_even_dist [10] [18]",
"customers_even_dist [18] null");
assertEquals(
expected,
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED));
}

private List<String> getTestAssignSnapshotSplits(
Expand Down Expand Up @@ -536,7 +553,7 @@ private List<String> getTestAssignSnapshotSplits(
return getSplitsFromAssigner(assigner);
}

private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus assignerStatus) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: The method name should use camel case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

TableId newTable =
TableId.parse(customerDatabase.getDatabaseName() + ".customer_card_single_line");
TableId processedTable =
Expand Down Expand Up @@ -619,7 +636,7 @@ private List<String> getTestAssignSnapshotSplitsFromCheckpoint() {
assignedSplits,
new HashMap<>(),
splitFinishedOffsets,
AssignerStatus.INITIAL_ASSIGNING,
assignerStatus,
remainingTables,
false,
true,
Expand Down