Skip to content

Commit

Permalink
[fix][ml] Persist correct markDeletePosition to prevent message loss (#…
Browse files Browse the repository at this point in the history
…18237)

(cherry picked from commit d612858)
  • Loading branch information
michaeljmarshall authored and congbobo184 committed Nov 17, 2022
1 parent 3ab0b2d commit ce8e891
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1066,29 +1066,33 @@ public Position getFirstPosition() {
return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0);
}

protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
if (position.equals(PositionImpl.earliest)) {
position = ledger.getFirstPosition();
} else if (position.equals(PositionImpl.latest)) {
position = ledger.getLastPosition().getNext();
protected void internalResetCursor(PositionImpl proposedReadPosition,
AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
final PositionImpl newReadPosition;
if (proposedReadPosition.equals(PositionImpl.earliest)) {
newReadPosition = ledger.getFirstPosition();
} else if (proposedReadPosition.equals(PositionImpl.latest)) {
newReadPosition = ledger.getLastPosition().getNext();
} else {
newReadPosition = proposedReadPosition;
}

log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
log.info("[{}] Initiate reset readPosition to {} on cursor {}", ledger.getName(), newReadPosition, name);

synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
ledger.getName(), position, name);
log.error("[{}] reset requested - readPosition [{}], previous reset in progress - cursor {}",
ledger.getName(), newReadPosition, name);
resetCursorCallback.resetFailed(
new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
position);
newReadPosition);
return;
}
}

final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;

final PositionImpl newPosition = position;
final PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newReadPosition);

VoidCallback finalCallback = new VoidCallback() {
@Override
Expand All @@ -1097,8 +1101,6 @@ public void operationComplete() {
// modify mark delete and read position since we are able to persist new position for cursor
lock.writeLock().lock();
try {
PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);

if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
MSG_CONSUMED_COUNTER_UPDATER.addAndGet(cursorImpl(), -getNumberOfEntries(
Range.closedOpen(newMarkDeletePosition, markDeletePosition)));
Expand All @@ -1114,55 +1116,55 @@ public void operationComplete() {
if (config.isDeletionAtBatchIndexLevelEnabled() && batchDeletedIndexes != null) {
batchDeletedIndexes.values().forEach(BitSetRecyclable::recycle);
batchDeletedIndexes.clear();
long[] resetWords = newPosition.ackSet;
long[] resetWords = newReadPosition.ackSet;
if (resetWords != null) {
BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(resetWords);
batchDeletedIndexes.put(newPosition, ackSet);
batchDeletedIndexes.put(newReadPosition, ackSet);
}
}

PositionImpl oldReadPosition = readPosition;
if (oldReadPosition.compareTo(newPosition) >= 0) {
log.info("[{}] reset position to {} before current read position {} on cursor {}",
ledger.getName(), newPosition, oldReadPosition, name);
if (oldReadPosition.compareTo(newReadPosition) >= 0) {
log.info("[{}] reset readPosition to {} before current read readPosition {} on cursor {}",
ledger.getName(), newReadPosition, oldReadPosition, name);
} else {
log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
ledger.getName(), newPosition, oldReadPosition, name);
log.info("[{}] reset readPosition to {} skipping from current read readPosition {} on "
+ "cursor {}", ledger.getName(), newReadPosition, oldReadPosition, name);
}
readPosition = newPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newPosition);
readPosition = newReadPosition;
ledger.onCursorReadPositionUpdated(ManagedCursorImpl.this, newReadPosition);
} finally {
lock.writeLock().unlock();
}
synchronized (pendingMarkDeleteOps) {
pendingMarkDeleteOps.clear();
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
ledger.getName(), newPosition, name);
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
}
}
callback.resetComplete(newPosition);
callback.resetComplete(newReadPosition);
updateLastActive();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
synchronized (pendingMarkDeleteOps) {
if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, TRUE, FALSE)) {
log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}",
ledger.getName(), newPosition, name);
log.error("[{}] expected reset readPosition [{}], but another reset in progress on cursor {}",
ledger.getName(), newReadPosition, name);
}
}
callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(
"unable to persist position for cursor reset " + newPosition.toString()), newPosition);
"unable to persist readPosition for cursor reset " + newReadPosition), newReadPosition);
}

};

persistentMarkDeletePosition = null;
inProgressMarkDeletePersistPosition = null;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, getProperties(), null, null);
internalAsyncMarkDelete(newPosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, getProperties(), null, null);
internalAsyncMarkDelete(newMarkDeletePosition, isCompactionCursor() ? getProperties() : Collections.emptyMap(),
new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,13 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.common.api.proto.IntRange;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -682,7 +683,7 @@ void testResetCursor1() throws Exception {
}
assertTrue(moveStatus.get());
PositionImpl earliestPos = new PositionImpl(actualEarliest.getLedgerId(), -1);
assertEquals(earliestPos, cursor.getReadPosition());
assertEquals(cursor.getReadPosition(), earliestPos);
moveStatus.set(false);

// reset to one after last entry in a ledger should point to the first entry in the next ledger
Expand Down Expand Up @@ -3173,6 +3174,126 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

@Test(timeOut = 20000)
public void testRecoverCursorAfterResetToLatestForNewEntry() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForNewEntry");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.latest);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 1L);
}

@Test(timeOut = 20000)
public void testRecoverCursorAfterResetToLatestForMultipleEntries() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorAfterResetToLatestForMultipleEntries");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.latest);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

c.resetCursor(PositionImpl.latest);

assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
assertEquals(c.getReadPosition().getEntryId(), 4);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);

// Publish messages to move the lastConfirmedEntry field forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}
@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.nonpersistent;

import static com.google.common.base.Preconditions.checkArgument;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl.create;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
Expand Down

0 comments on commit ce8e891

Please sign in to comment.