diff --git a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java index 0fd3627bbac1..e90683a20575 100644 --- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java +++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java @@ -328,6 +328,7 @@ public enum LogKeys implements LogKey { LAST_ACCESS_TIME, LAST_COMMITTED_CHECKPOINT_ID, LAST_COMMIT_BASED_CHECKPOINT_ID, + LAST_SCAN_TIME, LAST_VALID_TIME, LATEST_BATCH_ID, LATEST_COMMITTED_BATCH_ID, diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0c6d6acf66c8..4863291b529b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -102,7 +102,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S) // Number of threads used to replay event logs. - private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS) + private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS) + // Number of threads used to compact rolling event logs. + private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS) private val logDir = conf.get(History.HISTORY_LOG_DIR) @@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private val replayExecutor: ExecutorService = { if (!Utils.isTesting) { - ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor") + ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + numReplayThreads, 1024, "log-replay-executor") + } else { + ThreadUtils.sameThreadExecutorService() + } + } + + /** + * Fixed size thread pool to compact log files. + */ + private val compactExecutor: ExecutorService = { + if (!Utils.isTesting) { + ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + numCompactThreads, 1024, "log-compact-executor") } else { ThreadUtils.sameThreadExecutorService() } @@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) initThread.interrupt() initThread.join() } - Seq(pool, replayExecutor).foreach { executor => + Seq(pool, replayExecutor, compactExecutor).foreach { executor => executor.shutdown() if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow() @@ -487,7 +502,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) var count: Int = 0 try { val newLastScanTime = clock.getTimeMillis() - logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " + + log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}") // Mark entries that are processing as not stale. Such entries do not have a chance to be // updated with the new 'lastProcessed' time and thus any entity that completes processing @@ -495,7 +511,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // and will be deleted from the UI until the next 'checkForLogs' run. val notStale = mutable.HashSet[String]() val updated = Option(fs.listStatus(new Path(logDir))) - .map(_.toImmutableArraySeq).getOrElse(Nil) + .map(_.toImmutableArraySeq).getOrElse(Seq.empty) .filter { entry => isAccessible(entry.getPath) } .filter { entry => if (isProcessing(entry.getPath)) { @@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } if (updated.nonEmpty) { - logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") + logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}") } updated.foreach { entry => - submitLogProcessTask(entry.rootPath) { () => + submitLogProcessTask(entry.rootPath, replayExecutor) { () => mergeApplicationListing(entry, newLastScanTime, true) } } @@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // triggering another task for compaction task only if it succeeds if (succeeded) { - submitLogProcessTask(rootPath) { () => compact(reader) } + submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) } } } } @@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */ - private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = { + private def submitLogProcessTask( + rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = { try { processing(rootPath) - replayExecutor.submit(task) + pool.submit(task) } catch { // let the iteration over the updated entries break, since an exception on - // replayExecutor.submit (..) indicates the ExecutorService is unable + // pool.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting task", e) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 8eaa37cceee9..d44d3c32dfef 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -228,6 +228,12 @@ private[spark] object History { .intConf .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads") + .version("4.1.0") + .doc("Number of threads that will be used by history server to compact event logs.") + .intConf + .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications") .version("1.0.0") .doc("The number of applications to retain UI data for in the cache. If this cap is " + diff --git a/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala new file mode 100644 index 000000000000..506a83f5590e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/BlockingThreadPoolExecutorService.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util +import java.util.concurrent._ + +import com.google.common.util.concurrent.Futures + +// scalastyle:off +/** + * This thread pool executor throttles the submission of new tasks by using a semaphore. + * Task submissions require permits, task completions release permits. + *

+ * NOTE: [[invoke*]] methods are not supported, you should either use the [[submit]] methods + * or the [[execute]] method. + *

+ * This is inspired by + * + * Apache S4 BlockingThreadPoolExecutorService + */ +// scalastyle:on +private[spark] class BlockingThreadPoolExecutorService( + nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory) + extends ExecutorService { + + private val permits = new Semaphore(nThreads + workQueueSize) + + private val workQuque = new LinkedBlockingQueue[Runnable](nThreads + workQueueSize) + + private val delegate = new ThreadPoolExecutor( + nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory) + + override def shutdown(): Unit = delegate.shutdown() + + override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow() + + override def isShutdown: Boolean = delegate.isShutdown + + override def isTerminated: Boolean = delegate.isTerminated + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = + delegate.awaitTermination(timeout, unit) + + override def submit[T](task: Callable[T]): Future[T] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new CallableWithPermitRelease(task)) + } + + override def submit[T](task: Runnable, result: T): Future[T] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new RunnableWithPermitRelease(task), result) + } + + override def submit(task: Runnable): Future[_] = { + try permits.acquire() catch { + case e: InterruptedException => + Thread.currentThread.interrupt() + return Futures.immediateFailedFuture(e) + } + delegate.submit(new RunnableWithPermitRelease(task)) + } + + override def execute(command: Runnable): Unit = { + try permits.acquire() catch { + case _: InterruptedException => + Thread.currentThread.interrupt() + } + delegate.execute(new RunnableWithPermitRelease(command)) + } + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = + throw new UnsupportedOperationException("Not implemented") + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]], + timeout: Long, unit: TimeUnit): util.List[Future[T]] = + throw new UnsupportedOperationException("Not implemented") + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = + throw new UnsupportedOperationException("Not implemented") + + override def invokeAny[T]( + tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = + throw new UnsupportedOperationException("Not implemented") + + /** + * Releases a permit after the task is executed. + */ + private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable { + override def run(): Unit = try delegate.run() finally permits.release() + } + + /** + * Releases a permit after the task is completed. + */ + private class CallableWithPermitRelease[T](delegate: Callable[T]) extends Callable[T] { + override def call(): T = try delegate.call() finally permits.release() + } +} diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index e9d14f904db4..d22e14d99265 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -188,6 +188,23 @@ private[spark] object ThreadUtils { rejectedExecutionHandler) } + /** + * Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task submission will + * be blocked when queue is full. + * + * @param nThreads the number of threads in the pool + * @param workQueueSize the capacity of the queue to use for holding tasks before they are + * executed. Task submission will be blocked when queue is full. + * @param prefix thread names are formatted as prefix-ID, where ID is a unique, sequentially + * assigned integer. + * @return BlockingThreadPoolExecutorService + */ + def newDaemonBlockingThreadPoolExecutorService( + nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = { + val threadFactory = namedThreadFactory(prefix) + new BlockingThreadPoolExecutorService(nThreads, workQueueSize, threadFactory) + } + /** * Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads. */ diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 04f661db691e..d74bc2699944 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite { } } + test("newDaemonBlockingThreadPoolExecutorService") { + val nThread = 3 + val workQueueSize = 5 + val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1) + val latch = new CountDownLatch(1) + val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService( + nThread, workQueueSize, "ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService") + + try { + val submitThread = new Thread(() => { + (0 until nThread + workQueueSize + 1).foreach { i => + blockingPool.execute(() => { + latch.await(10, TimeUnit.SECONDS) + }) + submithreadsLatch.countDown() + } + }) + submitThread.setDaemon(true) + submitThread.start() + + // the last one task submission will be blocked until previous tasks completed + eventually(timeout(10.seconds)) { + assert(submithreadsLatch.getCount === 1L) + } + latch.countDown() + eventually(timeout(10.seconds)) { + assert(submithreadsLatch.getCount === 0L) + assert(!submitThread.isAlive) + } + } finally { + blockingPool.shutdownNow() + } + } + test("sameThread") { val callerThreadName = Thread.currentThread().getName() val f = Future { diff --git a/docs/monitoring.md b/docs/monitoring.md index 957ee555191a..49d04b328f29 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -329,6 +329,14 @@ Security options for the Spark History Server are covered more detail in the 2.0.0 + + spark.history.fs.numCompactThreads + 25% of available cores + + Number of threads that will be used by history server to compact event logs. + + 4.1.0 + spark.history.store.maxDiskUsage 10g