From 7ba1bd01cac999c728a8851e1d72af198234e4ed Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 18 Sep 2025 14:38:15 +0800 Subject: [PATCH 1/5] [SPARK-53631][CORE][SHS] Optimize memory and perf on SHS bootstrap --- .../org/apache/spark/internal/LogKeys.java | 1 + .../deploy/history/FsHistoryProvider.scala | 39 +++++++++++++------ .../spark/internal/config/History.scala | 6 +++ docs/monitoring.md | 8 ++++ 4 files changed, 43 insertions(+), 11 deletions(-) diff --git a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java index 0fd3627bbac1..e90683a20575 100644 --- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java +++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java @@ -328,6 +328,7 @@ public enum LogKeys implements LogKey { LAST_ACCESS_TIME, LAST_COMMITTED_CHECKPOINT_ID, LAST_COMMIT_BASED_CHECKPOINT_ID, + LAST_SCAN_TIME, LAST_VALID_TIME, LATEST_BATCH_ID, LATEST_COMMITTED_BATCH_ID, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0c6d6acf66c8..dfd6a721c78b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -34,6 +34,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, SafeModeAction} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.AccessControlException +import org.apache.hadoop.util.BlockingThreadPoolExecutorService import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -102,7 +103,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S) // Number of threads used to replay event logs. - private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS) + private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS) + private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS) private val logDir = conf.get(History.HISTORY_LOG_DIR) @@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private val replayExecutor: ExecutorService = { if (!Utils.isTesting) { - ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor") + BlockingThreadPoolExecutorService.newInstance( + numReplayThreads, 1024, 60L, TimeUnit.SECONDS, "log-replay-executor") + } else { + ThreadUtils.sameThreadExecutorService() + } + } + + /** + * Fixed size thread pool to compact log files. + */ + private val compactExecutor: ExecutorService = { + if (!Utils.isTesting) { + BlockingThreadPoolExecutorService.newInstance( + numCompactThreads, 1024, 60L, TimeUnit.SECONDS, "log-compact-executor") } else { ThreadUtils.sameThreadExecutorService() } @@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) initThread.interrupt() initThread.join() } - Seq(pool, replayExecutor).foreach { executor => + Seq(pool, replayExecutor, compactExecutor).foreach { executor => executor.shutdown() if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow() @@ -487,7 +502,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) var count: Int = 0 try { val newLastScanTime = clock.getTimeMillis() - logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " + + log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}") // Mark entries that are processing as not stale. Such entries do not have a chance to be // updated with the new 'lastProcessed' time and thus any entity that completes processing @@ -495,7 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // and will be deleted from the UI until the next 'checkForLogs' run. val notStale = mutable.HashSet[String]() val updated = Option(fs.listStatus(new Path(logDir))) - .map(_.toImmutableArraySeq).getOrElse(Nil) + .map(_.toImmutableArraySeq).getOrElse(Seq.empty) .filter { entry => isAccessible(entry.getPath) } .filter { entry => if (isProcessing(entry.getPath)) { @@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } if (updated.nonEmpty) { - logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") + logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}") } updated.foreach { entry => - submitLogProcessTask(entry.rootPath) { () => + submitLogProcessTask(entry.rootPath, replayExecutor) { () => mergeApplicationListing(entry, newLastScanTime, true) } } @@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // triggering another task for compaction task only if it succeeds if (succeeded) { - submitLogProcessTask(rootPath) { () => compact(reader) } + submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) } } } } @@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */ - private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = { + private def submitLogProcessTask( + rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = { try { processing(rootPath) - replayExecutor.submit(task) + pool.submit(task) } catch { // let the iteration over the updated entries break, since an exception on - // replayExecutor.submit (..) indicates the ExecutorService is unable + // pool.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting task", e) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 8eaa37cceee9..d44d3c32dfef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -228,6 +228,12 @@ private[spark] object History { .intConf .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads") + .version("4.1.0") + .doc("Number of threads that will be used by history server to compact event logs.") + .intConf + .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications") .version("1.0.0") .doc("The number of applications to retain UI data for in the cache. If this cap is " + diff --git a/docs/monitoring.md b/docs/monitoring.md index 957ee555191a..49d04b328f29 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -329,6 +329,14 @@ Security options for the Spark History Server are covered more detail in the 2.0.0 + + spark.history.fs.numCompactThreads + 25% of available cores + + Number of threads that will be used by history server to compact event logs. + + 4.1.0 + spark.history.store.maxDiskUsage 10g From 47208525bb458a60ab8e46a49301b50b1eaaa1fb Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 18 Sep 2025 15:48:26 +0800 Subject: [PATCH 2/5] nit --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index dfd6a721c78b..b4405e6a31be 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -104,6 +104,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Number of threads used to replay event logs. private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS) + // Number of threads used to compact rolling event logs. private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS) private val logDir = conf.get(History.HISTORY_LOG_DIR) From dbd982ad714f9fa680c66966ec2e2346fb78a423 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 20 Sep 2025 22:01:27 +0800 Subject: [PATCH 3/5] BlockingThreadPoolExecutorService --- .../deploy/history/FsHistoryProvider.scala | 9 +- .../BlockingThreadPoolExecutorService.scala | 124 ++++++++++++++++++ .../org/apache/spark/util/ThreadUtils.scala | 17 +++ .../apache/spark/util/ThreadUtilsSuite.scala | 34 +++++ 4 files changed, 179 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b4405e6a31be..4863291b529b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -34,7 +34,6 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, SafeModeAction} import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.security.AccessControlException -import org.apache.hadoop.util.BlockingThreadPoolExecutorService import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil @@ -212,8 +211,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private val replayExecutor: ExecutorService = { if (!Utils.isTesting) { - BlockingThreadPoolExecutorService.newInstance( - numReplayThreads, 1024, 60L, TimeUnit.SECONDS, "log-replay-executor") + ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + numReplayThreads, 1024, "log-replay-executor") } else { ThreadUtils.sameThreadExecutorService() } @@ -224,8 +223,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private val compactExecutor: ExecutorService = { if (!Utils.isTesting) { - BlockingThreadPoolExecutorService.newInstance( - numCompactThreads, 1024, 60L, TimeUnit.SECONDS, "log-compact-executor") + ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + numCompactThreads, 1024, "log-compact-executor") } else { ThreadUtils.sameThreadExecutorService() } diff --git a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala new file mode 100644 index 000000000000..ae0a531accfc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util +import java.util.concurrent._ + +import com.google.common.util.concurrent.Futures + +// scalastyle:off +/** + * This thread pool executor throttles the submission of new tasks by using a semaphore. + * Task submissions require permits, task completions release permits. + *

+ * NOTE: you should either use the [[BlockingThreadPoolExecutorService.submit(Callable)]] + * methods or the [[BlockingThreadPoolExecutorService.execute(Runnable)]] method. + *

+ * This is inspired by + * + * Apache S4 BlockingThreadPoolExecutorService + */ +// scalastyle:on +private[spark] class BlockingThreadPoolExecutorService( + nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory) + extends ExecutorService { + + private val permits = new Semaphore(nThreads + workQueueSize) + + private val workQuque = new LinkedBlockingQueue[Runnable](nThreads + workQueueSize) + + private val delegate = new ThreadPoolExecutor( + nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory) + + override def shutdown(): Unit = delegate.shutdown() + + override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + + override def isShutdown: Boolean = delegate.isShutdown + + override def isTerminated: Boolean = delegate.isTerminated + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = + delegate.awaitTermination(timeout, unit) + + override def submit[T](task: Callable[T]): Future[T] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new CallableWithPermitRelease(task)) + } + + override def submit[T](task: Runnable, result: T): Future[T] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new RunnableWithPermitRelease(task), result) + } + + override def submit(task: Runnable): Future[_] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new RunnableWithPermitRelease(task)) + } + + override def execute(command: Runnable): Unit = { + try permits.acquire() catch { + case _: InterruptedException => + Thread.currentThread.interrupt() + } + delegate.execute(new RunnableWithPermitRelease(command)) + } + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = + throw new RuntimeException("Not implemented") + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]], + timeout: Long, unit: TimeUnit): util.List[Future[T]] = + throw new RuntimeException("Not implemented") + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = + throw new RuntimeException("Not implemented") + + override def invokeAny[T]( + tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = + throw new RuntimeException("Not implemented") + + /** + * Releases a permit after the task is executed. + */ + private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable { + override def run(): Unit = try delegate.run() finally permits.release() + } + + /** + * Releases a permit after the task is completed. + */ + private class CallableWithPermitRelease[T](delegate: Callable[T]) extends Callable[T] { + override def call(): T = try delegate.call() finally permits.release() + } +} diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index e9d14f904db4..12b654ba79fe 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -188,6 +188,23 @@ private[spark] object ThreadUtils { rejectedExecutionHandler) } + /** + * Simliar to newFixedThreadPool, but with a bound workQueue, task submission will be blocked + * when queue is full. + * + * @param nThreads the number of threads in the pool + * @param workQueueSize the capacity of the queue to use for holding tasks before they are + * executed. Task submission will be blocked when queue is full. + * @param prefix thread names are formatted as prefix-ID, where ID is a unique, sequentially + * assigned integer. + * @return BlockingThreadPoolExecutorService + */ + def newDaemonBlockingThreadPoolExecutorService( + nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = { + val threadFactory = namedThreadFactory(prefix) + new BlockingThreadPoolExecutorService(nThreads, workQueueSize, threadFactory) + } + /** * Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads. */ diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 04f661db691e..d74bc2699944 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonBlockingThreadPoolExecutorService") { + val nThread = 3 + val workQueueSize = 5 + val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1) + val latch = new CountDownLatch(1) + val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + nThread, workQueueSize, "ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService") + + try { + val submitThread = new Thread(() => { + (0 until nThread + workQueueSize + 1).foreach { i => + blockingPool.execute(() => { + latch.await(10, TimeUnit.SECONDS) + }) + submithreadsLatch.countDown() + } + }) + submitThread.setDaemon(true) + submitThread.start() + + // the last one task submission will be blocked until previous tasks completed + eventually(timeout(10.seconds)) { + assert(submithreadsLatch.getCount === 1L) + } + latch.countDown() + eventually(timeout(10.seconds)) { + assert(submithreadsLatch.getCount === 0L) + assert(!submitThread.isAlive) + } + } finally { + blockingPool.shutdownNow() + } + } + test("sameThread") { val callerThreadName = Thread.currentThread().getName() val f = Future { From 7cd22f9512e4730f3dfe4689df56b0462d285fa0 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 20 Sep 2025 23:20:01 +0800 Subject: [PATCH 4/5] nit --- .../util/BlockingThreadPoolExecutorService.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala index ae0a531accfc..506a83f5590e 100644 --- a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala +++ b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala @@ -27,8 +27,8 @@ import com.google.common.util.concurrent.Futures * This thread pool executor throttles the submission of new tasks by using a semaphore. * Task submissions require permits, task completions release permits. *

- * NOTE: you should either use the [[BlockingThreadPoolExecutorService.submit(Callable)]] - * methods or the [[BlockingThreadPoolExecutorService.execute(Runnable)]] method. + * NOTE: [[invoke*]] methods are not supported, you should either use the [[submit]] methods + * or the [[execute]] method. *

* This is inspired by * @@ -94,19 +94,19 @@ private[spark] class BlockingThreadPoolExecutorService( override def invokeAll[T]( tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = - throw new RuntimeException("Not implemented") + throw new UnsupportedOperationException("Not implemented") override def invokeAll[T]( tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = - throw new RuntimeException("Not implemented") + throw new UnsupportedOperationException("Not implemented") override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = - throw new RuntimeException("Not implemented") + throw new UnsupportedOperationException("Not implemented") override def invokeAny[T]( tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = - throw new RuntimeException("Not implemented") + throw new UnsupportedOperationException("Not implemented") /** * Releases a permit after the task is executed. From 6d005790cf24580265e42a7f1c9a3d0b7008da72 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Sat, 20 Sep 2025 23:21:41 +0800 Subject: [PATCH 5/5] nit --- core/src/main/scala/org/apache/spark/util/ThreadUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index 12b654ba79fe..d22e14d99265 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -189,8 +189,8 @@ private[spark] object ThreadUtils { } /** - * Simliar to newFixedThreadPool, but with a bound workQueue, task submission will be blocked - * when queue is full. + * Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task submission will + * be blocked when queue is full. * * @param nThreads the number of threads in the pool * @param workQueueSize the capacity of the queue to use for holding tasks before they are