Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class IncrementalSourceEnumerator
private Boundedness boundedness;

@Nullable protected Integer streamSplitTaskId = null;
private boolean isStreamSplitUpdateRequestAlreadySent = false;

public IncrementalSourceEnumerator(
SplitEnumeratorContext<SourceSplitBase> context,
Expand Down Expand Up @@ -272,17 +273,20 @@ protected void syncWithReaders(int[] subtaskIds, Throwable t) {
}

private void requestStreamSplitUpdateIfNeed() {
if (isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
if (!isStreamSplitUpdateRequestAlreadySent
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
// If enumerator knows which reader is assigned stream split, just send to this reader,
// nor sends to all registered readers.
if (streamSplitTaskId != null) {
isStreamSplitUpdateRequestAlreadySent = true;
LOG.info(
"The enumerator requests subtask {} to update the stream split after newly added table.",
streamSplitTaskId);
context.sendEventToSourceReader(
streamSplitTaskId, new StreamSplitUpdateRequestEvent());
} else {
for (int reader : getRegisteredReader()) {
isStreamSplitUpdateRequestAlreadySent = true;
LOG.info(
"The enumerator requests subtask {} to update the stream split after newly added table.",
reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ private void syncWithReaders(int[] subtaskIds, Throwable t) {
private void requestBinlogSplitUpdateIfNeed() {
if (!isBinlogSplitUpdateRequestAlreadySent
&& isNewlyAddedAssigningSnapshotFinished(splitAssigner.getAssignerStatus())) {
isBinlogSplitUpdateRequestAlreadySent = true;
for (int subtaskId : getRegisteredReader()) {
isBinlogSplitUpdateRequestAlreadySent = true;
LOG.info(
"The enumerator requests subtask {} to update the binlog split after newly added table.",
subtaskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ private void testRemoveTablesOneByOne(
waitForSinkSize("sink", fetchedDataList.size());
assertEqualsInAnyOrder(
fetchedDataList, TestValuesTableFactory.getRawResultsAsStrings("sink"));
// wait task to stream phase
sleepMs(10000);
finishedSavePointPath = triggerSavepointWithRetry(jobClient, savepointDirectory);
jobClient.cancel().get();
}
Expand Down