diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1dd0715918042..208b6de550e8b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.security.PrivilegedExceptionAction import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -192,16 +193,32 @@ private[spark] class Client( * Cleanup application staging directory. */ private def cleanupStagingDir(appId: ApplicationId): Unit = { - val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) - try { - val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val fs = stagingDirPath.getFileSystem(hadoopConf) - if (!preserveFiles && fs.delete(stagingDirPath, true)) { - logInfo(s"Deleted staging directory $stagingDirPath") + if (sparkConf.get(PRESERVE_STAGING_FILES)) { + return + } + + def cleanupStagingDirInternal(): Unit = { + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + try { + val fs = stagingDirPath.getFileSystem(hadoopConf) + if (fs.delete(stagingDirPath, true)) { + logInfo(s"Deleted staging directory $stagingDirPath") + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) } - } catch { - case ioe: IOException => - logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + + if (isClusterMode && principal != null && keytab != null) { + val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + newUgi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + cleanupStagingDirInternal() + } + }) + } else { + cleanupStagingDirInternal() } }