From 10ee39f35ecf4006fd9299e2304b0937d1008fe3 Mon Sep 17 00:00:00 2001 From: Rajshekhar Muchandi Date: Wed, 24 Jan 2024 14:13:08 +0530 Subject: [PATCH] [LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in safe mode Added safe mode check to implement safe mode --- conf/livy.conf.template | 5 +++ .../main/scala/org/apache/livy/LivyConf.scala | 6 +++ .../recovery/FileSystemStateStore.scala | 45 +++++++++++++++++++ .../recovery/FileSystemStateStoreSpec.scala | 35 ++++++++++++++- 4 files changed, 90 insertions(+), 1 deletion(-) diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 7566971c3..e99251d02 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -195,3 +195,8 @@ # Enable to allow custom classpath by proxy user in cluster mode # The below configuration parameter is disabled by default. # livy.server.session.allow-custom-classpath = true + +# value specifies interval to check safe mode in hdfs filesystem +# livy.server.hdfs.safe-mode.interval = 5 +# value specifies max attempts to retry when safe mode is ON in hdfs filesystem +# livy.server.hdfs.safe-mode.max.retry.attempts = 10 diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 31b687259..720aa4e15 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -252,6 +252,12 @@ object LivyConf { // how often to check livy session leakage val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s") + // value specifies interval to check safe mode in hdfs filesystem + val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5) + + // value specifies max attempts to retry when safe mode is ON in hdfs filesystem + val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12) + // Whether session timeout should be checked, by default it will be checked, which means inactive // session will be stopped after "livy.server.session.timeout" val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true) diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala index 826a2fbd7..6fee7f0e2 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala @@ -20,6 +20,7 @@ package org.apache.livy.server.recovery import java.io.{FileNotFoundException, IOException} import java.net.URI import java.util +import java.util.concurrent.TimeUnit import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.apache.hadoop.fs.Options.{CreateOpts, Rename} import org.apache.hadoop.fs.permission.{FsAction, FsPermission} +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.protocol.HdfsConstants import org.apache.livy.{LivyConf, Logging} import org.apache.livy.Utils.usingResource @@ -42,6 +45,8 @@ class FileSystemStateStore( this(livyConf, None) } + private val fs = FileSystem.newInstance(livyConf.hadoopConf) + private val fsUri = { val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL) require(fsPath != null && !fsPath.isEmpty, @@ -57,6 +62,8 @@ class FileSystemStateStore( // Only Livy user should have access to state files. fileContext.setUMask(new FsPermission("077")) + startSafeModeCheck() + // Create state store dir if it doesn't exist. val stateStorePath = absPath(".") try { @@ -134,4 +141,42 @@ class FileSystemStateStore( } private def absPath(key: String): Path = new Path(fsUri.getPath(), key) + + /** + * Checks whether HDFS is in safe mode. + * + * Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons + * makes it more public than not. + */ + def isFsInSafeMode(): Boolean = fs match { + case dfs: DistributedFileSystem => + isFsInSafeMode(dfs) + case _ => + false + } + + def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = { + /* true to check only for Active NNs status */ + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true) + } + + def startSafeModeCheck(): Unit = { + // Cannot probe anything while the FS is in safe mode, + // so wait for seconds which is configurable + val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS) + val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS) + for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) { + info("HDFS is still in safe mode. Waiting...") + Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval)) + } + + // if hdfs is still in safe mode + // even after max retry attempts + // then throw IllegalStateException + if (isFsInSafeMode()) { + throw new IllegalStateException("Reached max retry attempts for safe mode check " + + "in hdfs file system") + } + } + } diff --git a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala index 082a80ab9..1ee1a2fe2 100644 --- a/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/recovery/FileSystemStateStoreSpec.scala @@ -23,10 +23,11 @@ import java.util import org.apache.hadoop.fs._ import org.apache.hadoop.fs.Options.{CreateOpts, Rename} import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.hdfs.DistributedFileSystem import org.hamcrest.Description import org.mockito.ArgumentMatcher import org.mockito.Matchers.{any, anyInt, argThat, eq => equal} -import org.mockito.Mockito.{atLeastOnce, verify, when} +import org.mockito.Mockito.{atLeastOnce, spy, verify, when} import org.mockito.internal.matchers.Equals import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { conf } + def makeConfWithTwoSeconds(): LivyConf = { + val conf = new LivyConf() + conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/") + conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2)) + conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2)) + conf + } + def mockFileContext(rootDirPermission: String): FileContext = { val fileContext = mock[FileContext] val rootDirStatus = mock[FileStatus] @@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { verify(fileContext).delete(pathEq("/key"), equal(false)) } + + it("set safe mode ON and wait") { + val fileContext = mockFileContext("700") + val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext))) + val dfs = mock[DistributedFileSystem] + provider.isFsInSafeMode() + assert(!provider.isFsInSafeMode(dfs)) + } + + it("provider throws IllegalStateException when reaches 'N' " + + "max attempts to access HDFS file system") { + val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(), + Some(mockFileContext("700"))) + provider.inSafeMode = true + intercept[IllegalStateException](provider.startSafeModeCheck()) + } } + + private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext]) + extends FileSystemStateStore(conf, context) { + var inSafeMode = true + + override def isFsInSafeMode(): Boolean = inSafeMode + } + }