Skip to content

Commit

Permalink
[LIVY-989] Livy core support for interactive session idleTimeout (#426)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently, a Livy interactive session has a field called ttl, which kills the session if it has been idle for a given amount of time. However, here is the expected behavior:

 * ttl: kills the session if it has been active for a certain duration, irrespective of idleness.
 * idleTimeout: kills the session if it has been idle for the given duration. (The current TTL behaves in this manner.)

JIRA: https://issues.apache.org/jira/browse/LIVY-989

## How was this patch tested?

Tested manually by creating interactive session with idle time and TTL.
  • Loading branch information
askhatri authored Oct 30, 2023
1 parent 024efa5 commit 86fc823
Show file tree
Hide file tree
Showing 15 changed files with 92 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static class SessionInfo implements ClientMessage {
public final Map<String, String> appInfo;
public final List<String> log;
public final String ttl;
public final String idleTimeout;
public final String driverMemory;
public final int driverCores;
public final String executorMemory;
Expand All @@ -77,7 +78,7 @@ public static class SessionInfo implements ClientMessage {

public SessionInfo(int id, String name, String appId, String owner, String state,
String kind, Map<String, String> appInfo, List<String> log,
String ttl, String driverMemory,
String ttl, String idleTimeout, String driverMemory,
int driverCores, String executorMemory, int executorCores, Map<String, String> conf,
List<String> archives, List<String> files, int heartbeatTimeoutInSecond, List<String> jars,
int numExecutors, String proxyUser, List<String> pyFiles, String queue) {
Expand All @@ -91,6 +92,7 @@ public SessionInfo(int id, String name, String appId, String owner, String state
this.appInfo = appInfo;
this.log = log;
this.ttl = ttl;
this.idleTimeout = idleTimeout;
this.driverMemory = driverMemory;
this.driverCores = driverCores;
this.executorMemory = executorMemory;
Expand All @@ -106,7 +108,7 @@ public SessionInfo(int id, String name, String appId, String owner, String state
}

private SessionInfo() {
this(-1, null, null, null, null, null, null, null, null, null, 0, null, 0, null, null,
this(-1, null, null, null, null, null, null, null, null, null, null, 0, null, 0, null, null,
null, 0, null, 0, null, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.pyFiles).thenReturn(List())
when(session.stop()).thenReturn(Future.successful(()))
when(session.ttl).thenReturn(None)
when(session.idleTimeout).thenReturn(None)
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
session
Expand Down
15 changes: 15 additions & 0 deletions docs/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ Creates a new interactive Scala, Python, or R shell in the cluster.
</tr>
<tr>
<td>ttl</td>
<td>The timeout for this active session, example: 10m (10 minutes)</td>
<td>string</td>
</tr>
<tr>
<td>idleTimeout</td>
<td>The timeout for this inactive session, example: 10m (10 minutes)</td>
<td>string</td>
</tr>
Expand Down Expand Up @@ -636,6 +641,16 @@ A session represents an interactive shell.
<td>The detailed application info</td>
<td>Map of key=val</td>
</tr>
<tr>
<td>ttl</td>
<td>The timeout for this active session, example: 10m (10 minutes)</td>
<td>string</td>
</tr>
<tr>
<td>idleTimeout</td>
<td>The timeout for this inactive session, example: 10m (10 minutes)</td>
<td>string</td>
</tr>
<tr>
<td>jars</td>
<td>jars to be used in this session</td>
Expand Down
2 changes: 1 addition & 1 deletion scalastyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@

<check level="error" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
<parameter name="maxParameters"><![CDATA[11]]></parameter>
<parameter name="maxParameters"><![CDATA[12]]></parameter>
</parameters>
</check>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CreateInteractiveRequest {
var conf: Map[String, String] = Map()
var heartbeatTimeoutInSecond: Int = 0
var ttl: Option[String] = None
var idleTimeout: Option[String] = None

override def toString: String = {
s"[kind: $kind, proxyUser: $proxyUser, " +
Expand All @@ -52,6 +53,7 @@ class CreateInteractiveRequest {
(if (name.isDefined) s"name: ${name.get}, " else "") +
(if (conf.nonEmpty) s"conf: ${conf.mkString(",")}, " else "") +
s"heartbeatTimeoutInSecond: $heartbeatTimeoutInSecond, " +
(if (ttl.isDefined) s"ttl: ${ttl.get}]" else "]")
(if (ttl.isDefined) s"ttl: ${ttl.get}, " else "") +
(if (idleTimeout.isDefined) s"idleTimeout: ${idleTimeout.get}]" else "]")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ case class InteractiveRecoveryMetadata(
heartbeatTimeoutS: Int,
owner: String,
ttl: Option[String],
idleTimeout: Option[String],
driverMemory: Option[String],
driverCores: Option[Int],
executorMemory: Option[String],
Expand Down Expand Up @@ -87,6 +88,7 @@ object InteractiveSession extends Logging {
request: CreateInteractiveRequest,
sessionStore: SessionStore,
ttl: Option[String],
idleTimeout: Option[String],
mockApp: Option[SparkApp] = None,
mockClient: Option[RSCClient] = None): InteractiveSession = {
val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}".toLowerCase()
Expand Down Expand Up @@ -138,6 +140,7 @@ object InteractiveSession extends Logging {
owner,
impersonatedUser,
ttl,
idleTimeout,
sessionStore,
request.driverMemory,
request.driverCores,
Expand Down Expand Up @@ -177,6 +180,7 @@ object InteractiveSession extends Logging {
metadata.owner,
metadata.proxyUser,
metadata.ttl,
metadata.idleTimeout,
sessionStore,
metadata.driverMemory,
metadata.driverCores,
Expand Down Expand Up @@ -416,6 +420,7 @@ class InteractiveSession(
owner: String,
override val proxyUser: Option[String],
ttl: Option[String],
idleTimeout: Option[String],
sessionStore: SessionStore,
val driverMemory: Option[String],
val driverCores: Option[Int],
Expand All @@ -429,7 +434,7 @@ class InteractiveSession(
val pyFiles: List[String],
val queue: Option[String],
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, name, owner, ttl, livyConf)
extends Session(id, name, owner, ttl, idleTimeout, livyConf)
with SessionHeartbeat
with SparkAppListener {

Expand Down Expand Up @@ -514,13 +519,15 @@ class InteractiveSession(
}
})
}
startedOn = Some(System.nanoTime())
info(s"Started $this")
}

override def logLines(): IndexedSeq[String] = app.map(_.log()).getOrElse(sessionLog)

override def recoveryMetadata: RecoveryMetadata =
InteractiveRecoveryMetadata(id, name, appId, appTag, kind,
heartbeatTimeout.toSeconds.toInt, owner, None,
heartbeatTimeout.toSeconds.toInt, owner, ttl, idleTimeout,
driverMemory, driverCores, executorMemory, executorCores, conf,
archives, files, jars, numExecutors, pyFiles, queue,
proxyUser, rscDriverUri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ class InteractiveSessionServlet(
val createRequest = bodyAs[CreateInteractiveRequest](req)
val sessionId = sessionManager.nextId();

// Calling getTimeAsMs just to validate the ttl value
// Calling getTimeAsMs just to validate the ttl and idleTimeout values
if (createRequest.ttl.isDefined) {
ClientConf.getTimeAsMs(createRequest.ttl.get);
}
if (createRequest.idleTimeout.isDefined) {
ClientConf.getTimeAsMs(createRequest.idleTimeout.get);
}

InteractiveSession.create(
sessionId,
Expand All @@ -69,7 +72,8 @@ class InteractiveSessionServlet(
accessManager,
createRequest,
sessionStore,
createRequest.ttl)
createRequest.ttl,
createRequest.idleTimeout)
}

override protected[interactive] def clientSessionView(
Expand Down Expand Up @@ -114,8 +118,8 @@ class InteractiveSessionServlet(

new SessionInfo(session.id, session.name.orNull, session.appId.orNull,
session.owner, session.state.toString, session.kind.toString,
session.appInfo.asJavaMap, logs.asJava,
session.proxyUser.orNull, session.driverMemory.orNull,
session.appInfo.asJavaMap, logs.asJava, session.ttl.orNull,
session.idleTimeout.orNull, session.driverMemory.orNull,
session.driverCores.getOrElse(0), session.executorMemory.orNull,
session.executorCores.getOrElse(0), conf, archives,
files, session.heartbeatTimeoutS, jars,
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/scala/org/apache/livy/sessions/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,15 @@ abstract class Session(
val name: Option[String],
val owner: String,
val ttl: Option[String],
val idleTimeout: Option[String],
val livyConf: LivyConf)
extends Logging {

def this(id: Int,
name: Option[String],
owner: String,
livyConf: LivyConf) {
this(id, name, owner, None, livyConf)
this(id, name, owner, None, None, livyConf)
}

import Session._
Expand All @@ -171,6 +172,8 @@ abstract class Session(

private var _lastActivity = System.nanoTime()

var startedOn : Option[Long] = None

// Directory where the session's staging files are created. The directory is only accessible
// to the session's effective user.
private var stagingDir: Path = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,32 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
} else {
val currentTime = System.nanoTime()
var calculatedTimeout = sessionTimeout;
if (session.ttl.isDefined) {
calculatedTimeout = ClientConf.getTimeAsMs(session.ttl.get)
if (session.idleTimeout.isDefined) {
calculatedTimeout = ClientConf.getTimeAsMs(session.idleTimeout.get)
}
calculatedTimeout = TimeUnit.MILLISECONDS.toNanos(calculatedTimeout)
currentTime - session.lastActivity > calculatedTimeout
if (currentTime - session.lastActivity > calculatedTimeout) {
return true
}
if (session.ttl.isDefined && session.startedOn.isDefined) {
calculatedTimeout = TimeUnit.MILLISECONDS.toNanos(
ClientConf.getTimeAsMs(session.ttl.get))
if (currentTime - session.startedOn.get > calculatedTimeout) {
return true
}
}
false
}
}
}

Future.sequence(all().filter(expired).map { s =>
info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e6} ms.")
s.state match {
case st: FinishedSessionState =>
info(s"Deleting $s because it finished before ${sessionStateRetainedInSec / 1e9} secs.")
case _ =>
info(s"Deleting $s because it was inactive or the time to leave the period is over.")
}
delete(s)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ object SessionServletSpec {

val PROXY_USER = "proxyUser"

class MockSession(id: Int, owner: String, val proxyUser: Option[String], livyConf: LivyConf)
extends Session(id, None, owner, livyConf) {
class MockSession(id: Int, owner: String, ttl: Option[String], idleTimeout: Option[String],
val proxyUser: Option[String], livyConf: LivyConf)
extends Session(id, None, owner, ttl, idleTimeout, livyConf) {

case class MockRecoveryMetadata(id: Int) extends RecoveryMetadata()

Expand Down Expand Up @@ -64,7 +65,7 @@ object SessionServletSpec {
val owner = remoteUser(req)
val impersonatedUser = accessManager.checkImpersonation(
proxyUser(req, params.get(PROXY_USER)), owner)
new MockSession(sessionManager.nextId(), owner, impersonatedUser, conf)
new MockSession(sessionManager.nextId(), owner, None, None, impersonatedUser, conf)
}

override protected def clientSessionView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
when(session.proxyUser).thenReturn(None)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
when(session.idleTimeout).thenReturn(None)
when(session.driverMemory).thenReturn(None)
when(session.driverCores).thenReturn(None)
when(session.executorMemory).thenReturn(None)
Expand Down Expand Up @@ -195,6 +196,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
when(session.logLines()).thenReturn(log)
when(session.heartbeatExpired).thenReturn(false)
when(session.ttl).thenReturn(None)
when(session.idleTimeout).thenReturn(None)
when(session.driverMemory).thenReturn(None)
when(session.driverCores).thenReturn(None)
when(session.executorMemory).thenReturn(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class InteractiveSessionSpec extends FunSpec
RSCConf.Entry.LIVY_JARS.key() -> ""
)
InteractiveSession.create(0, None, null, None, livyConf, accessManager, req,
sessionStore, None, mockApp)
sessionStore, None, None, mockApp)
}

private def executeStatement(code: String, codeType: Option[String] = None): JValue = {
Expand Down Expand Up @@ -277,7 +277,7 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
78, Some("Test session"), None, "appTag", Spark, 0, null, None, None,
78, Some("Test session"), None, "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
List.empty[String], None, List.empty[String], None, None, Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
Expand All @@ -296,7 +296,7 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m = InteractiveRecoveryMetadata(
78, None, None, "appTag", Spark, 0, null, None, None,
78, None, None, "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
List.empty[String], None, List.empty[String], None, None, Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
Expand All @@ -313,7 +313,7 @@ class InteractiveSessionSpec extends FunSpec
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
val m = InteractiveRecoveryMetadata(
78, None, Some("appId"), "appTag", Spark, 0, null, None, None,
78, None, Some("appId"), "appTag", Spark, 0, null, None, None, None,
None, None, None, Map.empty[String, String], List.empty[String], List.empty[String],
List.empty[String], None, List.empty[String], None, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.livy.sessions
import org.apache.livy.LivyConf

class MockSession(id: Int, owner: String, conf: LivyConf, name: Option[String] = None,
ttl: Option[String] = None)
extends Session(id, name, owner, ttl, conf) {
ttl: Option[String] = None, idleTimeout: Option[String] = None)
extends Session(id, name, owner, ttl, idleTimeout, conf) {
case class RecoveryMetadata(id: Int) extends Session.RecoveryMetadata()

override val proxyUser = None

override def start(): Unit = ()
override def start(): Unit = {
startedOn = Some(System.nanoTime())
}

var stopped = false
override protected def stopSession(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,18 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
it("should garbage collect old sessions with ttl") {
val (livyConf, manager) = createSessionManager()
val session = manager.register(new MockSession(manager.nextId(), null, livyConf,
None, Some("4s")))
None, Some("4s"), None))
manager.get(session.id).isDefined should be(true)
eventually(timeout(5 seconds), interval(100 millis)) {
Await.result(manager.collectGarbage(), Duration.Inf)
manager.get(session.id) should be(None)
}
}

it("should garbage collect old sessions with idleTimeout") {
val (livyConf, manager) = createSessionManager()
val session = manager.register(new MockSession(manager.nextId(), null, livyConf,
None, None, Some("4s")))
manager.get(session.id).isDefined should be(true)
eventually(timeout(5 seconds), interval(100 millis)) {
Await.result(manager.collectGarbage(), Duration.Inf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf: LivyC
server.accessManager,
createInteractiveRequest,
server.sessionStore,
None,
None)
onLivySessionOpened(newSession)
newSession
Expand Down

0 comments on commit 86fc823

Please sign in to comment.