diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 2068c13060..ded912e864 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -827,14 +827,7 @@ public LogAppendInfo appendRecordsToLeader(MemoryLogRecords memoryLogRecords, in throw new LogStorageException( "Error while appending records to " + tableBucket, e); } - - // we may need to increment high watermark if isr could be down to 1 or the - // replica count is 1. - boolean hwIncreased = maybeIncrementLeaderHW(logTablet, clock.milliseconds()); - - if (hwIncreased) { - tryCompleteDelayedOperations(); - } + maybeIncrementLeaderHW(logTablet, clock.milliseconds()); return appendInfo; }); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java index eaf5c44cba..a9ff296863 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/delay/DelayedFetchLogTest.java @@ -92,6 +92,9 @@ void testCompleteDelayedFetchLog() throws Exception { future::complete); assertThat(future.get()).containsOnly(new ProduceLogResultForBucket(tb, 0, 10L)); + // check and complete manually + numComplete = delayedFetchLogManager.checkAndComplete(delayedTableBucketKey); + assertThat(numComplete).isEqualTo(1); assertThat(delayedFetchLogManager.numDelayed()).isEqualTo(0); assertThat(delayedFetchLogManager.watched()).isEqualTo(0);