From 5bd04da5d4c663503a00b8ef9a9f0523baed61dd Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 2 Aug 2023 13:46:25 +0900 Subject: [PATCH] Remove session-based directory when the isolated session cache is evicted --- .../org/apache/spark/executor/Executor.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 1b7bb8af79a9..9290b5b36a8f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -35,7 +35,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, WrappedArray} import scala.concurrent.duration._ import scala.util.control.NonFatal -import com.google.common.cache.CacheBuilder +import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification} import com.google.common.util.concurrent.ThreadFactoryBuilder import org.slf4j.MDC @@ -196,11 +196,24 @@ private[spark] class Executor( // Classloader isolation // The default isolation group - val defaultSessionState = newSessionState(JobArtifactState("default", None)) + val defaultSessionState: IsolatedSessionState = newSessionState(JobArtifactState("default", None)) - val isolatedSessionCache = CacheBuilder.newBuilder() + val isolatedSessionCache: Cache[String, IsolatedSessionState] = CacheBuilder.newBuilder() .maximumSize(100) .expireAfterAccess(30, TimeUnit.MINUTES) + .removalListener(new RemovalListener[String, IsolatedSessionState]() { + override def onRemoval( + notification: RemovalNotification[String, IsolatedSessionState]): Unit = { + val state = notification.getValue + // Cache is always used for isolated sessions. + assert(!isDefaultState(state.sessionUUID)) + val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), state.sessionUUID) + if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) { + Utils.deleteRecursively(sessionBasedRoot) + } + logInfo(s"Session evicted: ${state.sessionUUID}") + } + }) .build[String, IsolatedSessionState] // Set the classloader for serializer