Skip to content

Commit

Permalink
HBASE-25037 Lots of thread pool are changed to non daemon after HBASE…
Browse files Browse the repository at this point in the history
…-24750 which causes trouble when shutting down (#2407)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
Apache9 authored Sep 16, 2020
1 parent bfdc96e commit 1bb19e0
Show file tree
Hide file tree
Showing 37 changed files with 66 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ public LogRollBackupSubprocedurePool(String name, Configuration conf) {
LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
this.name = name;
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
executor =
new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ class AsyncConnectionImpl implements AsyncConnection {

@VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class MulticastListener implements Listener {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastListener() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);

protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
TimeUnit.MILLISECONDS);
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
10, TimeUnit.MILLISECONDS);

private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d")
private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

protected boolean running = true; // if client runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class NettyRpcConnection extends RpcConnection {

private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

private final NettyRpcClient rpcClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public int run(String[] args) throws Exception {
TableName tableName = TableName.valueOf(args[0]);
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d")
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
// need a thread pool and may have a better performance if you use it correctly as it can save
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public TwoConcurrentActionPolicy(long sleepTime, Action[] actionsOne, Action[] a
this.actionsOne = actionsOne;
this.actionsTwo = actionsTwo;
executor = Executors.newFixedThreadPool(2,
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d")
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public boolean start() {
// Create the thread pool that will execute RPCs
threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
.setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
.setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void start() {
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxQueueLength),
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d")
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,13 @@ public void start() {
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxQueueLength),
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor =
new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public static class MulticastPublisher implements Publisher {
private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

public MulticastPublisher() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ private Pair<Integer, Integer> splitStoreFiles(final MasterProcedureEnv env,
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d")
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public DirScanPool(Configuration conf) {

private static ThreadPoolExecutor initializePool(int size) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d")
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ private NamedQueueRecorder(Configuration conf) {
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);

// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(eventCount),
this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI,
new BlockingWaitStrategy());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());

// initialize ringbuffer event handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ static class FlushTableSubprocedurePool {
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ void interruptIfNecessary() {
synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
.setUncaughtExceptionHandler(eh).build();
.setDaemon(true).setUncaughtExceptionHandler(eh).build();
for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ static class SnapshotSubprocedurePool {
this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,9 @@ public FSHLog(final FileSystem fs, final Path rootDir, final String logDir,
String hostingThreadName = Thread.currentThread().getName();
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new,
getPreallocatedEventCount(),
this.disruptor = new Disruptor<>(RingBufferTruck::new, getPreallocatedEventCount(),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ZKPermissionWatcher(ZKWatcher watcher,
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d")
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ private ThreadPoolExecutor createExecutor(final String name) {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d")
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private ExecutorService createThreadPool(Configuration conf) {
int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d")
new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ public boolean accept(Path path) {

// run in multiple threads
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d")
new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
try {
// ignore all file status items that are not of interest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public HBaseFsck(Configuration conf) throws IOException, ClassNotFoundException
private static ExecutorService createThreadPool(Configuration conf) {
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
return new ScheduledThreadPoolExecutor(numThreads,
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d")
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,12 @@ public Void call() throws IOException {
* "hbase.hregion.open.and.init.threads.max" property.
*/
static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
final String threadNamePrefix, int regionNumber) {
int maxThreads = Math.min(regionNumber, conf.getInt(
"hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.
getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final String threadNamePrefix, int regionNumber) {
int maxThreads =
Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads,
30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
return regionOpenAndInitThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryB
this.controller = controller;
this.entryBuffers = entryBuffers;
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d")
new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private ExecutorService createThreadPool() {

ThreadPoolExecutor tpe =
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d")
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
tpe.allowCoreThreadTimeOut(true);
return tpe;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void test() throws Exception {
int numThreads = 7;
AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d")
new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
List<Future<?>> futures = new ArrayList<>();
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public static class CustomThreadPoolCoprocessor implements RegionCoprocessor, Re
private ExecutorService getPool() {
int maxThreads = 1;
long keepAliveTime = 60;
ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d")
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
pool.allowCoreThreadTimeOut(true);
return pool;
Expand Down
Loading

0 comments on commit 1bb19e0

Please sign in to comment.