diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java index 143ba8ebc8..03825384d2 100644 --- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java +++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java @@ -34,7 +34,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.serializer.Serializer; @@ -49,6 +48,7 @@ import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.util.ChecksumUtils; +import org.apache.uniffle.common.util.ThreadUtils; public class SortWriteBufferManager { @@ -139,10 +139,7 @@ public SortWriteBufferManager( this.maxBufferSize = maxBufferSize; this.sendExecutorService = Executors.newFixedThreadPool( sendThreadNum, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("send-thread-%d") - .build()); + ThreadUtils.getThreadFactory("send-thread-%d")); } // todo: Single Buffer should also have its size limit diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 738589fe02..c5da1ac2b6 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -29,7 +29,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; @@ -62,6 +61,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.common.util.ThreadUtils; public class RssShuffleManager implements ShuffleManager { @@ -194,11 +194,11 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE_DEFAULT_VALUE); threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize * 2, keepAliveTime, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(Integer.MAX_VALUE), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SendData").build()); + ThreadUtils.getThreadFactory("SendData")); if (isDriver) { heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build()); + ThreadUtils.getThreadFactory("rss-heartbeat-%d")); } } } diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index 5c7e2d90cf..032767a943 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -32,7 +32,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; @@ -66,6 +65,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.common.util.ThreadUtils; public class RssShuffleManager implements ShuffleManager { @@ -200,7 +200,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { Queues.newLinkedBlockingQueue(Integer.MAX_VALUE)); if (isDriver) { heartBeatScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rss-heartbeat-%d").build()); + ThreadUtils.getThreadFactory("rss-heartbeat-%d")); } } diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 168b7c49cf..669431cbb2 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -33,7 +33,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +71,7 @@ import org.apache.uniffle.common.ShuffleBlockInfo; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.ThreadUtils; public class ShuffleWriteClientImpl implements ShuffleWriteClient { @@ -98,7 +98,7 @@ public ShuffleWriteClientImpl(String clientType, int retryMax, long retryInterva this.retryIntervalMax = retryIntervalMax; this.coordinatorClientFactory = new CoordinatorClientFactory(clientType); this.heartBeatExecutorService = Executors.newFixedThreadPool(heartBeatThreadNum, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("client-heartbeat-%d").build()); + ThreadUtils.getThreadFactory("client-heartbeat-%d")); this.replica = replica; this.replicaWrite = replicaWrite; this.replicaRead = replicaRead; diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java index f3e6edce8e..1b22196ba9 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Queues; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.grpc.BindableService; import io.grpc.Server; import io.grpc.ServerBuilder; @@ -34,6 +33,7 @@ import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.metrics.GRPCMetrics; import org.apache.uniffle.common.util.ExitUtils; +import org.apache.uniffle.common.util.ThreadUtils; public class GrpcServer implements ServerInterface { @@ -52,7 +52,7 @@ public GrpcServer(RssBaseConf conf, BindableService service, GRPCMetrics grpcMet 10, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(Integer.MAX_VALUE), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Grpc-%d").build() + ThreadUtils.getThreadFactory("Grpc-%d") ); boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED); diff --git a/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java new file mode 100644 index 0000000000..f8000b6e3e --- /dev/null +++ b/common/src/main/java/org/apache/uniffle/common/util/ThreadUtils.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.util; + +import java.util.concurrent.ThreadFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Provide a general method to create a thread factory to make the code more standardized + */ +public class ThreadUtils { + + public static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName).build(); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java index 02d8399962..51d6fe8c5f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessCandidatesChecker.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory; import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.common.util.ThreadUtils; /** * AccessCandidatesChecker maintain a list of candidate access id and update it periodically, @@ -75,7 +75,7 @@ public AccessCandidatesChecker(AccessManager accessManager) throws Exception { int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC); updateAccessCandidatesSES = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateAccessCandidates-%d").build()); + ThreadUtils.getThreadFactory("UpdateAccessCandidates-%d")); updateAccessCandidatesSES.scheduleAtFixedRate( this::updateAccessCandidates, 0, updateIntervalS, TimeUnit.SECONDS); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java index 4292de7941..9d6f0fcfe7 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java @@ -31,13 +31,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.common.util.ThreadUtils; public class ApplicationManager { @@ -59,7 +59,7 @@ public ApplicationManager(CoordinatorConf conf) { expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED); // the thread for checking application status scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ApplicationManager-%d").build()); + ThreadUtils.getThreadFactory("ApplicationManager-%d")); scheduledExecutorService.scheduleAtFixedRate( () -> statusCheck(), expired / 2, expired / 2, TimeUnit.MILLISECONDS); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java index 85c46b46ef..c3c1fb5ede 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClientConfManager.java @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; @@ -38,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.util.ThreadUtils; + public class ClientConfManager implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ClientConfManager.class); @@ -73,7 +74,7 @@ private void init(CoordinatorConf conf, Configuration hadoopConf) throws Excepti int updateIntervalS = conf.getInteger(CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC); updateClientConfSES = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ClientConfManager-%d").build()); + ThreadUtils.getThreadFactory("ClientConfManager-%d")); updateClientConfSES.scheduleAtFixedRate( this::updateClientConf, 0, updateIntervalS, TimeUnit.SECONDS); } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index b96bcd9dc5..98c69a6238 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -34,7 +34,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -43,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.util.ThreadUtils; + public class SimpleClusterManager implements ClusterManager { private static final Logger LOG = LoggerFactory.getLogger(SimpleClusterManager.class); @@ -63,7 +64,7 @@ public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) thro this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT); // the thread for checking if shuffle server report heartbeat in time scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SimpleClusterManager-%d").build()); + ThreadUtils.getThreadFactory("SimpleClusterManager-%d")); scheduledExecutorService.scheduleAtFixedRate( () -> nodesCheck(), heartbeatTimeout / 3, heartbeatTimeout / 3, TimeUnit.MILLISECONDS); @@ -73,7 +74,7 @@ public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) thro this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new Path(excludeNodesPath), hadoopConf); long updateNodesInterval = conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL); checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateExcludeNodes-%d").build()); + ThreadUtils.getThreadFactory("UpdateExcludeNodes-%d")); checkNodesExecutorService.scheduleAtFixedRate( () -> updateExcludeNodes(excludeNodesPath), updateNodesInterval, updateNodesInterval, TimeUnit.MILLISECONDS); } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index 367032bc3d..edf2b5ca08 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -28,7 +28,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.RangeMap; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.hadoop.conf.Configuration; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -37,6 +36,7 @@ import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.storage.StorageManager; import org.apache.uniffle.storage.common.Storage; @@ -84,7 +84,7 @@ public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, String shuffleSe int poolSize = shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_SIZE); long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE); threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue, - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("FlushEventThreadPool").build()); + ThreadUtils.getThreadFactory("FlushEventThreadPool")); storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(","); pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC); // the thread for flush data diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 2ea88e7424..0876c5140c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -33,7 +33,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.roaringbitmap.longlong.LongIterator; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; @@ -48,6 +47,7 @@ import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.common.util.ThreadUtils; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.server.buffer.ShuffleBufferManager; import org.apache.uniffle.server.storage.StorageManager; @@ -100,12 +100,12 @@ public ShuffleTaskManager( this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED); // the thread for checking application status this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("checkResource-%d").build()); + ThreadUtils.getThreadFactory("checkResource-%d")); scheduledExecutorService.scheduleAtFixedRate( () -> preAllocatedBufferCheck(), preAllocationExpired / 2, preAllocationExpired / 2, TimeUnit.MILLISECONDS); this.expiredAppCleanupExecutorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("expiredAppCleaner").build()); + ThreadUtils.getThreadFactory("expiredAppCleaner")); expiredAppCleanupExecutorService.scheduleAtFixedRate( () -> checkResourceStatus(), appExpiredWithoutHB / 2, appExpiredWithoutHB / 2, TimeUnit.MILLISECONDS);