Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] ManagedLedger: move to FENCED state in case of BadVersionException #17736

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public ManagedLedgerFencedException() {
super(new Exception("Attempted to use a fenced managed ledger"));
}

public ManagedLedgerFencedException(String message) {
super(message);
}

public ManagedLedgerFencedException(Exception e) {
super(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
if (e instanceof MetadataNotFoundException) {
callback.initializeFailed(new ManagedLedgerNotFoundException(e));
} else {
Expand Down Expand Up @@ -481,6 +482,7 @@ public void operationComplete(Void v, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
callback.initializeFailed(new ManagedLedgerException(e));
}
};
Expand Down Expand Up @@ -1022,6 +1024,7 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
callback.deleteCursorFailed(e, ctx);
}

Expand Down Expand Up @@ -1312,6 +1315,7 @@ public void operationComplete(Void result, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage());
handleBadVersion(e);
callback.terminateFailed(new ManagedLedgerException(e), ctx);
}
});
Expand Down Expand Up @@ -1396,6 +1400,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
State state = STATE_UPDATER.get(this);
if (state == State.Fenced) {
cancelScheduledTasks();
factory.close(this);
callback.closeFailed(new ManagedLedgerFencedException(), ctx);
return;
Expand Down Expand Up @@ -1519,6 +1524,7 @@ public void operationComplete(Void v, Stat stat) {
@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
handleBadVersion(e);
mbean.startDataLedgerDeleteOp();
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
mbean.endDataLedgerDeleteOp();
Expand All @@ -1527,14 +1533,12 @@ public void operationFailed(MetaStoreException e) {
BKException.getMessage(rc1));
}
}, null);

if (e instanceof BadVersionException) {
synchronized (ManagedLedgerImpl.this) {
log.error(
"[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
name);
lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
// Return ManagedLedgerFencedException to addFailed callback
// to indicate that the ledger is now fenced and topic needs to be closed
clearPendingAddEntries(new ManagedLedgerFencedException(e));
Expand All @@ -1557,6 +1561,12 @@ public void operationFailed(MetaStoreException e) {
updateLedgersListAfterRollover(cb, newLedger);
}
}

private void handleBadVersion(Throwable e) {
if (e instanceof BadVersionException) {
setFenced();
}
}
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
if (!metadataMutex.tryLock()) {
// Defer update for later
Expand Down Expand Up @@ -2463,12 +2473,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
TOTAL_SIZE_UPDATER.get(this));
}
if (STATE_UPDATER.get(this) == State.Closed) {
State currentState = STATE_UPDATER.get(this);
if (currentState == State.Closed) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
trimmerMutex.unlock();
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
return;
}
if (currentState == State.Fenced) {
log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
trimmerMutex.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we don't need to unlock the trimmerMutex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure.
I prefer to keep the ML in a clean status.
I did the same it works for a "closed" ML.
it is very like to being "Closed" in this point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, after checking the code, I think we should release the lock, or else the scheduled task will always try to get the lock per 100 milliseconds.

promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
return;
}

long slowestReaderLedgerId = -1;
if (!cursors.hasDurableCursors()) {
Expand Down Expand Up @@ -2557,7 +2574,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
return;
}

if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
scheduleDeferredTrimming(isTruncate, promise);
trimmerMutex.unlock();
Expand Down Expand Up @@ -2624,6 +2641,7 @@ public void operationFailed(MetaStoreException e) {
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
metadataMutex.unlock();
trimmerMutex.unlock();
handleBadVersion(e);

promise.completeExceptionally(e);
}
Expand Down Expand Up @@ -2708,7 +2726,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) {
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
setFenced();
cancelScheduledTasks();

List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
Expand Down Expand Up @@ -2957,7 +2975,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
promise.whenComplete((result, exception) -> {
offloadMutex.unlock();
if (exception != null) {
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx);
} else {
callback.offloadComplete(result, ctx);
}
Expand All @@ -2971,11 +2989,17 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct

private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
if (getState() == State.Closed) {
State currentState = getState();
if (currentState == State.Closed) {
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
String.format("managed ledger [%s] has already closed", name)));
return;
}
if (currentState == State.Fenced) {
promise.completeExceptionally(new ManagedLedgerFencedException(
String.format("managed ledger [%s] is fenced", name)));
return;
}
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
Expand Down Expand Up @@ -3117,6 +3141,7 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
handleBadVersion(e);
unlockingPromise.completeExceptionally(e);
}
});
Expand Down Expand Up @@ -3639,6 +3664,7 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
}

synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the log level to warn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a "problem".
it may happen, and we are handling it safely.
there is nothing that the sysadmin should be afraid of.

we should log "WARN" or "ERROR" when there is something bad, and you have to take extra care

STATE_UPDATER.set(this, State.Fenced);
}

Expand Down Expand Up @@ -3842,12 +3868,21 @@ private void scheduleTimeoutTask() {
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
: timeoutSec;
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest change

safeRun(() -> {
                checkTimeouts();
            })

to safeRun(this::checkTimeouts)

checkAddTimeout();
checkReadTimeout();
checkTimeouts();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
}
}

private void checkTimeouts() {
final State state = STATE_UPDATER.get(this);
if (state == State.Closed
|| state == State.Fenced) {
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add a warn log?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are already logs that say that we fenced or closed the topic.
I don't think it is worth to add something more

}
checkAddTimeout();
checkReadTimeout();
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down Expand Up @@ -4004,6 +4039,7 @@ public void operationComplete(Void result, Stat version) {
@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Update managedLedger's properties failed", name, e);
handleBadVersion(e);
callback.updatePropertiesFailed(e, ctx);
metadataMutex.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -387,6 +391,72 @@ public void recoverAfterZnodeVersionError() throws Exception {
}
}

@Test
public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_trim",
new ManagedLedgerConfig()
.setMaxEntriesPerLedger(2));
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger_trim")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

CompletableFuture<?> handle = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(handle);
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
instanceOf(ManagedLedgerException.BadVersionException.class));

assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());

// if the task started after the ML moved to Fenced state, it must fail
CompletableFuture<?> handleAlreadyFenced = new CompletableFuture<>();
ledger.trimConsumedLedgersInBackground(handleAlreadyFenced);
assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(),
instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class));

try {
ledger.addEntry("entry".getBytes());
fail("should fail");
} catch (ManagedLedgerFencedException e) {
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
}

assertFalse(factory.ledgers.isEmpty());
try {
ledger.close();
} catch (ManagedLedgerFencedException e) {
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
}

// verify that the ManagedLedger has been unregistered even if it was fenced
assertTrue(factory.ledgers.isEmpty());
}

@Test
public void badVersionErrorDuringTruncateLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_trim",
new ManagedLedgerConfig()
.setMaxEntriesPerLedger(2));
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger_trim")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

CompletableFuture<?> handle = ledger.asyncTruncate();
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
instanceOf(ManagedLedgerException.BadVersionException.class));

assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
}

@Test
public void recoverAfterWriteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -48,6 +49,8 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -125,6 +128,51 @@ public void testOffload() throws Exception {
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// ledgers should be marked as offloaded
ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());
}

@Test
public void testOffloadFenced() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setRetentionSizeInMB(10);
config.setLedgerOffloader(offloader);
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

int i = 0;
for (; i < 25; i++) {
String content = "entry-" + i;
ledger.addEntry(content.getBytes());
}
assertEquals(ledger.getLedgersInfoAsList().size(), 3);

metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger")
&& op == FaultInjectionMetadataStore.OperationType.PUT
);

assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () ->
ledger.offloadPrefix(ledger.getLastConfirmedEntry()));

assertEquals(ledger.getLedgersInfoAsList().size(), 3);

// the offloader actually wrote the data on the storage
assertEquals(ledger.getLedgersInfoAsList().stream()
.filter(e -> e.getOffloadContext().getComplete())
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
offloader.offloadedLedgers());

// but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed
ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext());

// check that the ledger is fenced
assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState());

}

@Test
Expand Down
Loading