Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] recover zk-badversion while updating cursor metadata #5604

Merged
merged 1 commit into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.bookkeeper.mledger;

import com.google.common.annotations.Beta;

import java.util.function.Supplier;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
Expand Down Expand Up @@ -77,10 +80,13 @@ ManagedLedger open(String name, ManagedLedgerConfig config)
* managed ledger configuration
* @param callback
* callback object
* @param mlOwnershipChecker
* checks ml-ownership in case updating ml-metadata fails due to ownership conflict
* @param ctx
* opaque context
*/
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, Object ctx);
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
Supplier<Boolean> mlOwnershipChecker, Object ctx);

/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1958,7 +1958,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
* @param properties
* @param callback
*/
private void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {

if (shouldPersistUnackRangesToLedger()) {
Expand Down Expand Up @@ -2053,6 +2053,30 @@ public void operationComplete(Void result, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
if (e instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
ledger.name, name, e.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are refreshing the cursor metadata, does it make sense we retry the operation after refreshing the cursor metadata?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have thought about it earlier but it will be tricky because of race condition:
if we retry the same function persistPositionMetaStore or any other functions with same metadata then by that time markDelete position could have been changed. also, metadata contains multiple info eg: ledgerId, markDelete-position and its properties, etc.. and cursor only maintains latest markDeletePosition in memory.. so, it's tricky to update metadata with all latest information in a synchronized manner. So, broker can rely on next acked message as this exception occurs rarely.

}

@Override
public void operationFailed(MetaStoreException e) {
if (log.isDebugEnabled()) {
log.debug(
"[{}] Failed to refresh cursor metadata-version for {} due to {}",
ledger.name, name, e.getMessage());
}
}
});
}
}
callback.operationFailed(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -280,7 +281,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
r.e = exception;
latch.countDown();
}
}, null);
}, null, null);

latch.await();

Expand All @@ -292,12 +293,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx) {
asyncOpen(name, new ManagedLedgerConfig(), callback, ctx);
asyncOpen(name, new ManagedLedgerConfig(), callback, null, ctx);
}

@Override
public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
final Object ctx) {
Supplier<Boolean> mlOwnershipChecker, final Object ctx) {

// If the ledger state is bad, remove it from the map.
CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name);
Expand Down Expand Up @@ -325,7 +326,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor,
orderedExecutor, name);
orderedExecutor, name, mlOwnershipChecker);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
Expand Down Expand Up @@ -188,6 +189,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;

Expand Down Expand Up @@ -252,6 +254,11 @@ public enum PositionBound {
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
final String name) {
this(factory, bookKeeper, store, config, scheduledExecutor, orderedExecutor, name, null);
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, OrderedExecutor orderedExecutor,
final String name, final Supplier<Boolean> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
Expand All @@ -275,6 +282,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,13 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
Expand All @@ -116,6 +118,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Expand All @@ -124,6 +127,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {

private static final Charset Encoding = Charsets.UTF_8;

@DataProvider(name = "checkOwnershipFlag")
public Object[][] checkOwnershipFlagProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test
public void managedLedgerApi() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Expand Down Expand Up @@ -355,7 +363,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, null);
}, null, null);

counter.await();

Expand Down Expand Up @@ -1980,7 +1988,7 @@ public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Excepti
}

// (3) Validate: cache should remove all entries read by both active cursors
log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
log.info("expected, found : {}, {}", (5 * (totalInsertedEntries)), entryCache.getSize());
assertEquals((5 * totalInsertedEntries), entryCache.getSize());

final int remainingEntries = totalInsertedEntries - readEntries;
Expand Down Expand Up @@ -2528,6 +2536,100 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {
}
}

/**
* It verifies that managed-cursor can recover metadata-version if it fails to update due to version conflict. This
* test verifies that version recovery happens if checkOwnership supplier is passed while creating managed-ledger.
*
* @param checkOwnershipFlag
* @throws Exception
*/
@Test(dataProvider = "checkOwnershipFlag")
public void recoverMLWithBadVersion(boolean checkOwnershipFlag) throws Exception {

ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);
ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, zkc, conf);

final MutableObject<ManagedLedger> ledger1 = new MutableObject<>(), ledger2 = new MutableObject<>();
final MutableObject<ManagedCursorImpl> cursor1 = new MutableObject<>(), cursor2 = new MutableObject<>();

createLedger(factory1, ledger1, cursor1, checkOwnershipFlag);
ledger1.getValue().addEntry("test1".getBytes(Encoding));
ledger1.getValue().addEntry("test2".getBytes(Encoding));
Entry entry = cursor1.getValue().readEntries(1).get(0);
cursor1.getValue().delete(entry.getPosition());

createLedger(factory2, ledger2, cursor2, checkOwnershipFlag);
entry = cursor2.getValue().readEntries(1).get(0);

// 1. closing cursor will change the zk-version
cursor1.getValue().close();

// 2. try to creatCursorLedger which should fail first time because of BadVersionException
// However, if checkOwnershipFlag is eanbled the managed-cursor will reover from that exception.
boolean isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2);
Assert.assertTrue(isFailed);

isFailed = updateCusorMetadataByCreatingMetadataLedger(cursor2);
if (checkOwnershipFlag) {
Assert.assertFalse(isFailed);
} else {
Assert.assertTrue(isFailed);
}

log.info("Test completed");
}

private boolean updateCusorMetadataByCreatingMetadataLedger(MutableObject<ManagedCursorImpl> cursor2)
throws InterruptedException {
MutableObject<Boolean> failed = new MutableObject<>();
failed.setValue(false);
CountDownLatch createLedgerDoneLatch = new CountDownLatch(1);
cursor2.getValue().createNewMetadataLedger(new VoidCallback() {

@Override
public void operationComplete() {
createLedgerDoneLatch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.setValue(true);
createLedgerDoneLatch.countDown();
}

});
createLedgerDoneLatch.await();
return failed.getValue();
}

private void createLedger(ManagedLedgerFactoryImpl factory, MutableObject<ManagedLedger> ledger1,
MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger1.setValue(ledger);
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursor1.setValue((ManagedCursorImpl) cursor);
latch.countDown();
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, checkOwnershipFlag ? () -> true : null, null);
latch.await();
}

private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
Expand All @@ -2543,4 +2645,4 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
}, () -> isTopicNsOwnedByBroker(topicName), null);

}).exceptionally((exception) -> {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
Expand Down Expand Up @@ -1217,6 +1217,15 @@ public void monitorBacklogQuota() {
});
}

public boolean isTopicNsOwnedByBroker(TopicName topicName) throws RuntimeException {
try {
return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
} catch (Exception e) {
log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage());
}
return false;
}

public void checkTopicNsOwnership(final String topic) throws RuntimeException {
TopicName topicName = TopicName.get(topic);
boolean ownedByThisInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.function.Supplier;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -208,7 +209,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any());
any(OpenLedgerCallback.class), any(Supplier.class), any());

// call openLedgerFailed on ML factory asyncOpen
doAnswer(new Answer<Object>() {
Expand All @@ -219,7 +220,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(mlFactoryMock).asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class),
any(OpenLedgerCallback.class), any());
any(OpenLedgerCallback.class), any(Supplier.class), any());

// call addComplete on ledger asyncAddEntry
doAnswer(new Answer<Object>() {
Expand Down
Loading