Skip to content

Commit

Permalink
Revert "Remove obsolete solution added in apache#12890"
Browse files Browse the repository at this point in the history
This reverts commit 54c5529.
  • Loading branch information
lhotari committed Oct 3, 2024
1 parent 78eccb8 commit 204ffec
Showing 1 changed file with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -239,6 +240,39 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return false;
}

if (!allowOutOfOrderDelivery) {
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
Optional<Position> firstReplayPosition = getFirstPositionInReplay();
if (firstReplayPosition.isPresent()) {
Position replayPosition = firstReplayPosition.get();
if (this.minReplayedPosition != null) {
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this
// async read, it is possible that this relayPosition should dispatch to consumer first. So in
// order to preserver order delivery, we need to discard this read result, and try to trigger a
// replay read, that containing "relayPosition", by calling readMoreEntries.
if (replayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+ "discard this read and retry with readMoreEntries.",
name, replayPosition, minReplayedPosition, readType);
}
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
skipNextBackoff = true;
return true;
}
}
}
}

// returns a boolean indicating whether look-ahead could be useful, when there's a consumer
// with available permits, and it's not able to make progress because of blocked hashes.
MutableBoolean triggerLookAhead = new MutableBoolean();
Expand Down

0 comments on commit 204ffec

Please sign in to comment.