Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27152 Under compaction mark may leak #4568

Merged
merged 1 commit into from
Jul 20, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -145,15 +144,15 @@ private void createCompactionExecutors() {
final String n = Thread.currentThread().getName();

StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
// Since the StealJobQueue is unbounded, we need not to set the RejectedExecutionHandler for
// the long and short compaction thread pool executors.
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS,
stealJobQueue,
new ThreadFactoryBuilder().setNameFormat(n + "-longCompactions-%d").setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS,
stealJobQueue.getStealFromQueue(),
new ThreadFactoryBuilder().setNameFormat(n + "-shortCompactions-%d").setDaemon(true).build());
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
}
sunhelly marked this conversation as resolved.
Show resolved Hide resolved

@Override
Expand Down Expand Up @@ -382,15 +381,20 @@ protected void requestCompactionInternal(HRegion region, HStore store, String wh
// pool; we will do selection there, and move to large pool if necessary.
pool = shortCompactions;
}
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));

// A simple implementation for under compaction marks.
// Since this method is always called in the synchronized methods, we do not need to use the
// boolean result to make sure that exactly the one that added here will be removed
// in the next steps.
underCompactionStores.add(getStoreNameForUnderCompaction(store));
if (LOG.isDebugEnabled()) {
LOG.debug(
"Add compact mark for store {}, priority={}, current under compaction "
+ "store size is {}",
getStoreNameForUnderCompaction(store), priority, underCompactionStores.size());
}
underCompactionStores.add(getStoreNameForUnderCompaction(store));
pool.submit(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
Expand Down Expand Up @@ -721,22 +725,6 @@ private String formatStackTrace(Exception ex) {
}
}

/**
* Cleanup class to use when rejecting a compaction request from the queue.
*/
private static class Rejection implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
if (runnable instanceof CompactionRunner) {
CompactionRunner runner = (CompactionRunner) runnable;
LOG.debug("Compaction Rejected: " + runner);
if (runner.compaction != null) {
runner.store.cancelRequestedCompaction(runner.compaction);
}
}
}
}

/**
* {@inheritDoc}
*/
Expand Down