diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index 05783348415..acefcf08567 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -81,6 +81,7 @@ public class IncrementalSourceEnumerator private Boundedness boundedness; @Nullable protected Integer streamSplitTaskId = null; + private boolean isStreamSplitUpdateRequestAlreadySent = false; public IncrementalSourceEnumerator( SplitEnumeratorContext context, @@ -272,10 +273,12 @@ protected void syncWithReaders(int[] subtaskIds, Throwable t) { } private void requestStreamSplitUpdateIfNeed() { - if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { + if (!isStreamSplitUpdateRequestAlreadySent + && isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { // If enumerator knows which reader is assigned stream split, just send to this reader, // nor sends to all registered readers. if (streamSplitTaskId != null) { + isStreamSplitUpdateRequestAlreadySent = true; LOG.info( "The enumerator requests subtask {} to update the stream split after newly added table.", streamSplitTaskId); @@ -283,6 +286,7 @@ private void requestStreamSplitUpdateIfNeed() { streamSplitTaskId, new StreamSplitUpdateRequestEvent()); } else { for (int reader : getRegisteredReader()) { + isStreamSplitUpdateRequestAlreadySent = true; LOG.info( "The enumerator requests subtask {} to update the stream split after newly added table.", reader); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java index 3b603d20259..bd2c8886405 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java @@ -277,8 +277,8 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) { private void requestBinlogSplitUpdateIfNeed() { if (!isBinlogSplitUpdateRequestAlreadySent && isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) { - isBinlogSplitUpdateRequestAlreadySent = true; for (int subtaskId : getRegisteredReader()) { + isBinlogSplitUpdateRequestAlreadySent = true; LOG.info( "The enumerator requests subtask {} to update the binlog split after newly added table.", subtaskId); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java index a8c24963aa4..59ebfd614c4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/org/apache/flink/cdc/connectors/oracle/source/NewlyAddedTableITCase.java @@ -495,6 +495,8 @@ private void testRemoveTablesOneByOne( waitForSinkSize("sink", fetchedDataList.size()); assertEqualsInAnyOrder( fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink")); + // wait task to stream phase + sleepMs(10000); finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory); jobClient.cancel().get(); }