Skip to content

Commit

Permalink
HBASE-22880 Move the DirScanPool out and do not use static field
Browse files Browse the repository at this point in the history
Signed-off-by Reid Chan <reidchan@apache.org>
  • Loading branch information
ZhaoBQ authored and Reidddddd committed Aug 24, 2019
1 parent d832786 commit 8961315
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.CleanerChore;
import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
Expand Down Expand Up @@ -333,6 +333,7 @@ public void run() {
private SnapshotCleanerChore snapshotCleanerChore = null;

CatalogJanitor catalogJanitorChore;
private DirScanPool cleanerPool;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
private LogCleaner logCleaner;
Expand Down Expand Up @@ -898,6 +899,7 @@ private void finishActiveMasterInitialization(MonitoredTask status)
(System.currentTimeMillis() - masterActiveTime) / 1000.0f));
this.masterFinishedInitializationTime = System.currentTimeMillis();
configurationManager.registerObserver(this.balancer);
configurationManager.registerObserver(this.cleanerPool);
configurationManager.registerObserver(this.hfileCleaner);
configurationManager.registerObserver(this.logCleaner);

Expand Down Expand Up @@ -1237,22 +1239,19 @@ private void startServiceThreads() throws IOException {
this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
startProcedureExecutor();

// Initial cleaner chore
CleanerChore.initChorePool(conf);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getOldLogDir().getFileSystem(conf),
getMasterFileSystem().getOldLogDir());
getChoreService().scheduleChore(logCleaner);

// Create cleaner thread pool
cleanerPool = new DirScanPool(conf);
// Start log cleaner thread
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getOldLogDir().getFileSystem(conf),
getMasterFileSystem().getOldLogDir(), cleanerPool);
//start the hfile archive cleaner thread
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
Map<String, Object> params = new HashMap<String, Object>();
params.put(MASTER, this);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir, params);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
getChoreService().scheduleChore(hfileCleaner);

final boolean isSnapshotChoreDisabled = conf.getBoolean(HConstants.SNAPSHOT_CLEANER_DISABLE,
Expand Down Expand Up @@ -1306,8 +1305,10 @@ protected void stopServiceThreads() {
}
stopChores();
super.stopServiceThreads();
CleanerChore.shutDownChorePool();

if (cleanerPool != null) {
cleanerPool.shutdownNow();
cleanerPool = null;
}
// Wait for all the remaining region servers to report in IFF we were
// running a cluster shutdown AND we were NOT aborting.
if (!isAborted() && this.serverManager != null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -38,7 +36,6 @@
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.FileStatusFilter;
import org.apache.hadoop.ipc.RemoteException;
Expand All @@ -53,11 +50,8 @@
* Abstract Cleaner that uses a chain of delegates to clean a directory of files
* @param <T> Cleaner delegate class that is dynamically loaded from configuration
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
justification="Static pool will be only updated once.")
@InterfaceAudience.Private
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore
implements ConfigurationObserver {
public abstract class CleanerChore<T extends FileCleanerDelegate> extends ScheduledChore {

private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName());
private static final int AVAIL_PROCESSORS = Runtime.getRuntime().availableProcessors();
Expand All @@ -69,85 +63,8 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
* while latter will use only 1 thread for chore to scan dir.
*/
public static final String CHORE_POOL_SIZE = "hbase.cleaner.scan.dir.concurrent.size";
private static final String DEFAULT_CHORE_POOL_SIZE = "0.25";

private static class DirScanPool {
int size;
ForkJoinPool pool;
int cleanerLatch;
AtomicBoolean reconfigNotification;

DirScanPool(Configuration conf) {
String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE);
size = calculatePoolSize(poolSize);
// poolSize may be 0 or 0.0 from a careless configuration,
// double check to make sure.
size = size == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : size;
pool = new ForkJoinPool(size);
LOG.info("Cleaner pool size is " + size);
reconfigNotification = new AtomicBoolean(false);
cleanerLatch = 0;
}

/**
* Checks if pool can be updated. If so, mark for update later.
* @param conf configuration
*/
synchronized void markUpdate(Configuration conf) {
int newSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE));
if (newSize == size) {
LOG.trace("Size from configuration is same as previous=" + newSize +
" no need to update.");
return;
}
size = newSize;
// Chore is working, update it later.
reconfigNotification.set(true);
}

/**
* Update pool with new size.
*/
synchronized void updatePool(long timeout) {
long stopWaitTime = System.currentTimeMillis() + timeout;
while (cleanerLatch != 0 && timeout > 0) {
try {
wait(timeout);
timeout = stopWaitTime - System.currentTimeMillis();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
shutDownNow();
LOG.info("Update chore's pool size from " + pool.getParallelism() + " to " + size);
pool = new ForkJoinPool(size);
}

synchronized void latchCountUp() {
cleanerLatch++;
}

synchronized void latchCountDown() {
cleanerLatch--;
notifyAll();
}

@SuppressWarnings("FutureReturnValueIgnored")
synchronized void submit(ForkJoinTask task) {
pool.submit(task);
}

synchronized void shutDownNow() {
if (pool == null || pool.isShutdown()) {
return;
}
pool.shutdownNow();
}
}
// It may be waste resources for each cleaner chore own its pool,
// so let's make pool for all cleaner chores.
private static volatile DirScanPool POOL;
static final String DEFAULT_CHORE_POOL_SIZE = "0.25";
private final DirScanPool pool;

protected final FileSystem fs;
private final Path oldFileDir;
Expand All @@ -156,22 +73,9 @@ synchronized void shutDownNow() {
protected Map<String, Object> params;
private AtomicBoolean enabled = new AtomicBoolean(true);

public static void initChorePool(Configuration conf) {
if (POOL == null) {
POOL = new DirScanPool(conf);
}
}

public static void shutDownChorePool() {
if (POOL != null) {
POOL.shutDownNow();
POOL = null;
}
}

public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null);
FileSystem fs, Path oldFileDir, String confKey, DirScanPool pool) {
this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, pool, null);
}

/**
Expand All @@ -182,14 +86,16 @@ public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Confi
* @param fs handle to the FS
* @param oldFileDir the path to the archived files
* @param confKey configuration key for the classes to instantiate
* @param pool the thread pool used to scan directories
* @param params members could be used in cleaner
*/
public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf,
FileSystem fs, Path oldFileDir, String confKey, Map<String, Object> params) {
public CleanerChore(String name, final int sleepPeriod, final Stoppable s,
Configuration conf, FileSystem fs, Path oldFileDir, String confKey,
DirScanPool pool, Map<String, Object> params) {
super(name, s, sleepPeriod);

Preconditions.checkNotNull(POOL, "Chore's pool isn't initialized, please call"
+ "CleanerChore.initChorePool(Configuration) before new a cleaner chore.");
Preconditions.checkNotNull(pool, "Chore's pool can not be null");
this.pool = pool;
this.fs = fs;
this.oldFileDir = oldFileDir;
this.conf = conf;
Expand Down Expand Up @@ -253,11 +159,6 @@ private void initCleanerChain(String confKey) {
}
}

@Override
public void onConfigurationChange(Configuration conf) {
POOL.markUpdate(conf);
}

/**
* A utility method to create new instances of LogCleanerDelegate based on the class name of the
* LogCleanerDelegate.
Expand Down Expand Up @@ -285,7 +186,7 @@ private T newFileCleaner(String className, Configuration conf) {
protected void chore() {
if (getEnabled()) {
try {
POOL.latchCountUp();
pool.latchCountUp();
if (runCleaner()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cleaned all WALs under " + oldFileDir);
Expand All @@ -296,23 +197,19 @@ protected void chore() {
}
}
} finally {
POOL.latchCountDown();
}
// After each cleaner chore, checks if received reconfigure notification while cleaning.
// First in cleaner turns off notification, to avoid another cleaner updating pool again.
if (POOL.reconfigNotification.compareAndSet(true, false)) {
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
POOL.updatePool((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
pool.latchCountDown();
}
// This cleaner is waiting for other cleaners finishing their jobs.
// To avoid missing next chore, only wait 0.8 * period, then shutdown.
pool.tryUpdatePoolSize((long) (0.8 * getTimeUnit().toMillis(getPeriod())));
} else {
LOG.trace("Cleaner chore disabled! Not cleaning.");
}
}

public Boolean runCleaner() {
CleanerTask task = new CleanerTask(this.oldFileDir, true);
POOL.submit(task);
pool.execute(task);
return task.join();
}

Expand Down Expand Up @@ -407,7 +304,7 @@ public synchronized void cleanup() {

@VisibleForTesting
int getChorePoolSize() {
return POOL.size;
return pool.getSize();
}

/**
Expand All @@ -426,10 +323,11 @@ private interface Action<T> {
}

/**
* Attemps to clean up a directory, its subdirectories, and files.
* Return value is true if everything was deleted. false on partial / total failures.
* Attemps to clean up a directory, its subdirectories, and files. Return value is true if
* everything was deleted. false on partial / total failures.
*/
private class CleanerTask extends RecursiveTask<Boolean> {
private final class CleanerTask extends RecursiveTask<Boolean> {
private static final long serialVersionUID = -1584635903138015418L;
private final Path dir;
private final boolean root;

Expand Down
Loading

0 comments on commit 8961315

Please sign in to comment.