From 618d6e6107aca239593961318af7fb1e5c0c2f3e Mon Sep 17 00:00:00 2001 From: Fei Wang Date: Sat, 26 Mar 2022 23:36:05 +0800 Subject: [PATCH] save --- .../HadoopFsDelegationTokenProvider.scala | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala index da9f4cb95e3..20f22a0cbe7 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/credentials/HadoopFsDelegationTokenProvider.scala @@ -22,6 +22,7 @@ import java.net.URI import java.security.PrivilegedExceptionAction import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem @@ -29,7 +30,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration import org.apache.hadoop.security.{Credentials, SecurityUtil, UserGroupInformation} import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod -import org.apache.kyuubi.Logging +import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.credentials.HadoopFsDelegationTokenProvider.{doAsProxyUser, validatedFsUris} import org.apache.kyuubi.util.KyuubiHadoopUtils @@ -63,19 +64,31 @@ class HadoopFsDelegationTokenProvider extends HadoopDelegationTokenProvider with override def obtainDelegationTokens(owner: String, creds: Credentials): Unit = { doAsProxyUser(owner) { - val fileSystems = fsUris.map(FileSystem.get(_, hadoopConf)).toSet + val fileSystems = fsUris.map { uri => + uri -> FileSystem.get(uri, hadoopConf) + } + val exceptionMsgList = ListBuffer[String]() try { // Renewer is not needed. But setting a renewer can avoid potential NPE. val renewer = UserGroupInformation.getCurrentUser.getUserName - fileSystems.foreach { fs => - info(s"getting token owned by $owner for: $fs") - fs.addDelegationTokens(renewer, creds) + fileSystems.foreach { case (uri, fs) => + info(s"getting token owned by $owner for: $uri") + try { + fs.addDelegationTokens(renewer, creds) + } catch { + case e: Exception => + exceptionMsgList += + s"Failed to get token owned by $owner for $uri: ${Utils.stringifyException(e)} " + } } } finally { // Token renewal interval is longer than FileSystems' underlying connections' max idle time. // Close FileSystems won't lose efficiency. - fileSystems.foreach(_.close()) + fileSystems.foreach(_._2.close()) + if (exceptionMsgList.nonEmpty) { + throw new KyuubiException(exceptionMsgList.mkString("\n")) + } } } }