Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][test] Fix thread leaks in Managed Ledger tests and remove duplicate shutdown code #21426

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -535,13 +536,12 @@ public CompletableFuture<Void> shutdownAsync() throws ManagedLedgerException {
int numLedgers = ledgerNames.size();
log.info("Closing {} ledgers", numLedgers);
for (String ledgerName : ledgerNames) {
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
CompletableFuture<ManagedLedgerImpl> ledgerFuture = ledgers.remove(ledgerName);
if (ledgerFuture == null) {
future.complete(null);
continue;
}
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
ledgerFuture.whenCompleteAsync((managedLedger, throwable) -> {
if (throwable != null || managedLedger == null) {
future.complete(null);
Expand Down Expand Up @@ -606,68 +606,20 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
}));
}
}));
entryCacheManager.clear();
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
return FutureUtil.waitForAll(futures).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
scheduledExecutor.shutdown();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
}

@Override
public void shutdown() throws InterruptedException, ManagedLedgerException {
if (closed) {
throw new ManagedLedgerException.ManagedLedgerFactoryClosedException();
try {
shutdownAsync().get();
} catch (ExecutionException e) {
throw getManagedLedgerException(e.getCause());
}
closed = true;

statsTask.cancel(true);
flushCursorsTask.cancel(true);
cacheEvictionExecutor.shutdownNow();

// take a snapshot of ledgers currently in the map to prevent race conditions
List<CompletableFuture<ManagedLedgerImpl>> ledgers = new ArrayList<>(this.ledgers.values());
int numLedgers = ledgers.size();
final CountDownLatch latch = new CountDownLatch(numLedgers);
log.info("Closing {} ledgers", numLedgers);

for (CompletableFuture<ManagedLedgerImpl> ledgerFuture : ledgers) {
ManagedLedgerImpl ledger = ledgerFuture.getNow(null);
if (ledger == null) {
latch.countDown();
continue;
}

ledger.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object ctx) {
latch.countDown();
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Got exception when closing managed ledger: {}", ledger.getName(), exception);
latch.countDown();
}
}, null);
}

latch.await();
log.info("{} ledgers closed", numLedgers);

if (isBookkeeperManaged) {
try {
BookKeeper bookkeeper = bookkeeperFactory.get();
if (bookkeeper != null) {
bookkeeper.close();
}
} catch (BKException e) {
throw new ManagedLedgerException(e);
}
}

scheduledExecutor.shutdownNow();

entryCacheManager.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
bkc.asyncDeleteLedger(ledgerId, originalCb, ctx);
} else {
deleteLedgerInfo.hasCalled = true;
new Thread(() -> {
cachedExecutor.submit(() -> {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(signal::get);
bkc.asyncDeleteLedger(ledgerId, cb, ctx);
}).start();
});
}
return null;
}).when(spyBookKeeper).asyncDeleteLedger(any(long.class), any(AsyncCallback.DeleteCallback.class), any());
Expand All @@ -208,6 +208,7 @@ private DeleteLedgerInfo makeDelayIfDoLedgerDelete(LedgerHandle ledger, final At
public void testLedgerInfoMetaCorrectIfAddEntryTimeOut() throws Exception {
String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
BookKeeper spyBookKeeper = spy(bkc);
@Cleanup("shutdown")
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);

Expand Down Expand Up @@ -3854,6 +3855,7 @@ public void testCancellationOfScheduledTasks() throws Exception {
public void testInactiveLedgerRollOver() throws Exception {
int inactiveLedgerRollOverTimeMs = 5;
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setInactiveLedgerRollOverTime(inactiveLedgerRollOverTimeMs, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -3885,11 +3887,11 @@ public void testInactiveLedgerRollOver() throws Exception {
List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
assertEquals(ledgers.size(), totalAddEntries);
ledger.close();
factory.shutdown();
}

@Test
public void testOffloadTaskCancelled() throws Exception {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ protected void startZKCluster() throws Exception {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder()
.metadataStoreName("metastore-" + getClass().getSimpleName())
.build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import lombok.SneakyThrows;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -70,7 +71,8 @@ public MockedBookKeeperTestCase(int numBookies) {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create("memory:local", MetadataStoreConfig.builder().build()));
MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().metadataStoreName("metastore-" + method.getName()).build()));

try {
// start bookkeeper service
Expand Down Expand Up @@ -102,7 +104,11 @@ public final void tearDown(Method method) {
}
try {
LOG.info("@@@@@@@@@ stopping " + method);
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
try {
factory.shutdownAsync().get(10, TimeUnit.SECONDS);
} catch (ManagedLedgerException.ManagedLedgerFactoryClosedException e) {
// ignore
}
factory = null;
stopBookKeeper();
metadataStore.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ private void initializeCommonPulsarServices(SpyConfig spyConfig) {
} else {
try {
MetadataStoreExtended store = MetadataStoreFactoryImpl.createExtended("memory:local",
MetadataStoreConfig.builder().build());
MetadataStoreConfig.builder()
.metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
registerCloseable(() -> {
store.close();
resetSpyOrMock(store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
protected abstract CompletableFuture<Boolean> existsFromStore(String path);

protected AbstractMetadataStore(String metadataStoreName) {
this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName));
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
registerListener(this);

this.childrenCache = Caffeine.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ protected void startZKCluster() throws Exception {
zkc = zkUtil.getZooKeeperClient();
metadataStore = new FaultInjectionMetadataStore(
MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder().metadataStoreName("metastore-" + getClass().getSimpleName()).build()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public MockedBookKeeperTestCase(int numBookies) {
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory:local",
MetadataStoreConfig.builder().build()));
MetadataStoreConfig.builder()
.metadataStoreName("metastore-" + method.getName())
.build()));
try {
// start bookkeeper service
startBookKeeper();
Expand Down