Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-977][SERVER][CONF] Livy can not be started if HDFS is still in safe mode #440

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,8 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true

# value specifies interval to check safe mode in hdfs filesystem
# livy.server.hdfs.safe-mode.interval = 5
# value specifies max attempts to retry when safe mode is ON in hdfs filesystem
# livy.server.hdfs.safe-mode.max.retry.attempts = 10
6 changes: 6 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ object LivyConf {
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")

// value specifies interval to check safe mode in hdfs filesystem
val HDFS_SAFE_MODE_INTERVAL_IN_SECONDS = Entry("livy.server.hdfs.safe-mode.interval", 5)

// value specifies max attempts to retry when safe mode is ON in hdfs filesystem
val HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS = Entry("livy.server.hdfs.safe-mode.max.retry.attempts", 12)

// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.livy.server.recovery
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.util
import java.util.concurrent.TimeUnit

import scala.reflect.ClassTag
import scala.util.control.NonFatal
Expand All @@ -28,6 +29,8 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.Utils.usingResource
Expand All @@ -42,6 +45,8 @@ class FileSystemStateStore(
this(livyConf, None)
}

private val fs = FileSystem.newInstance(livyConf.hadoopConf)

private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(fsPath != null && !fsPath.isEmpty,
Expand All @@ -57,6 +62,8 @@ class FileSystemStateStore(
// Only Livy user should have access to state files.
fileContext.setUMask(new FsPermission("077"))

startSafeModeCheck()

// Create state store dir if it doesn't exist.
val stateStorePath = absPath(".")
try {
Expand Down Expand Up @@ -134,4 +141,42 @@ class FileSystemStateStore(
}

private def absPath(key: String): Path = new Path(fsUri.getPath(), key)

/**
* Checks whether HDFS is in safe mode.
*
* Note that DistributedFileSystem is a `@LimitedPrivate` class, which for all practical reasons
* makes it more public than not.
*/
def isFsInSafeMode(): Boolean = fs match {
case dfs: DistributedFileSystem =>
isFsInSafeMode(dfs)
case _ =>
false
}

def isFsInSafeMode(dfs: DistributedFileSystem): Boolean = {
/* true to check only for Active NNs status */
dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_GET, true)
}

def startSafeModeCheck(): Unit = {
// Cannot probe anything while the FS is in safe mode,
// so wait for seconds which is configurable
val safeModeInterval = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS)
val safeModeMaxRetryAttempts = livyConf.getInt(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS)
for (retryAttempts <- 0 to safeModeMaxRetryAttempts if isFsInSafeMode()) {
info("HDFS is still in safe mode. Waiting...")
Thread.sleep(TimeUnit.SECONDS.toMillis(safeModeInterval))
}

// if hdfs is still in safe mode
// even after max retry attempts
// then throw IllegalStateException
if (isFsInSafeMode()) {
throw new IllegalStateException("Reached max retry attempts for safe mode check " +
"in hdfs file system")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import java.util
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.hamcrest.Description
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, anyInt, argThat, eq => equal}
import org.mockito.Mockito.{atLeastOnce, verify, when}
import org.mockito.Mockito.{atLeastOnce, spy, verify, when}
import org.mockito.internal.matchers.Equals
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand All @@ -53,6 +54,14 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
conf
}

def makeConfWithTwoSeconds(): LivyConf = {
val conf = new LivyConf()
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/")
conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2))
conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2))
conf
}

def mockFileContext(rootDirPermission: String): FileContext = {
val fileContext = mock[FileContext]
val rootDirStatus = mock[FileStatus]
Expand Down Expand Up @@ -188,5 +197,29 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {

verify(fileContext).delete(pathEq("/key"), equal(false))
}

it("set safe mode ON and wait") {
val fileContext = mockFileContext("700")
val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext)))
val dfs = mock[DistributedFileSystem]
provider.isFsInSafeMode()
assert(!provider.isFsInSafeMode(dfs))
}

it("provider throws IllegalStateException when reaches 'N' " +
"max attempts to access HDFS file system") {
val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(),
Some(mockFileContext("700")))
provider.inSafeMode = true
intercept[IllegalStateException](provider.startSafeModeCheck())
}
}

private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext])
extends FileSystemStateStore(conf, context) {
var inSafeMode = true

override def isFsInSafeMode(): Boolean = inSafeMode
}

}