Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-231: Multi node HA for batch sessions #222

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ class LivyServer extends Logging {

StateStore.init(livyConf)
val sessionStore = new SessionStore(livyConf)
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)

server = new WebServer(livyConf, host, port)
server.context.setResourceBase("src/main/com/cloudera/livy/server")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ object BatchSession {
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
})
}

def update(
m: BatchRecoveryMetadata,
livyConf: LivyConf,
sessionStore: SessionStore,
mockApp: Option[SparkApp] = None): BatchSession = {
val app = mockApp.map { m => (_: BatchSession) => m }.get /* .getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
} */
new BatchSession(
m.id, m.appTag, SessionState.Recovering(), livyConf, m.owner, m.proxyUser, sessionStore, app)
}
}

class BatchSession(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class BatchSessionServlet(

override protected def createSession(req: HttpServletRequest): BatchSession = {
val createRequest = bodyAs[CreateBatchRequest](req)
val sessionId: Int = sessionStore.getNextBatchSessionId
val proxyUser = checkImpersonation(createRequest.proxyUser, req)
BatchSession.create(
sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore)
sessionId, createRequest, livyConf, remoteUser(req), proxyUser, sessionStore)
}

override protected[batch] def clientSessionView(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) {
def getChildren(key: String): Seq[String] = List.empty[String]

def remove(key: String): Unit = {}

def nextBatchSessionId: Int = 0

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,10 @@ class FileSystemStateStore(
}

private def absPath(key: String): Path = new Path(fsUri.getPath(), key)

def nextBatchSessionId: Int = {
val nextId = get[Int]("batchSessionId").getOrElse(0)
this.set("batchSessionId", (nextId + 1).asInstanceOf[Object])
nextId
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.livy.server.recovery

import com.cloudera.livy.server.batch.BatchRecoveryMetadata

/**
* Methods in this trait will be called when a Livy server detects a change in the ZooKeeper server
*/
trait SessionManagerListener {
def register(recoveryData: BatchRecoveryMetadata): Unit
def remove(batchMetadata: BatchRecoveryMetadata): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class SessionStore(
.map(_.nextSessionId).getOrElse(0)
}

def getNextBatchSessionId: Int = {
val id = store.nextBatchSessionId
logger.info(s"The next batch session id=$id")
id
}

/**
* Remove a session from the state store.
*/
Expand All @@ -94,4 +100,8 @@ class SessionStore(

private def sessionPath(sessionType: String, id: Int): String =
s"$STORE_VERSION/$sessionType/$id"

def setListener(sessionManagerListener: SessionManagerListener): Unit = {
store.setListener(sessionManagerListener)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.server.batch.BatchRecoveryMetadata
import com.cloudera.livy.sessions.SessionKindModule
import com.cloudera.livy.sessions.SessionManager._

Expand All @@ -45,6 +46,9 @@ protected trait JsonMapper {
* Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later.
*/
abstract class StateStore(livyConf: LivyConf) extends JsonMapper {

protected var sessionManagerListener: SessionManagerListener = _

/**
* Set a key-value pair to this state store. It overwrites existing value.
* @throws Exception Throw when persisting the state store fails.
Expand All @@ -71,6 +75,28 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
* @throws Exception Throw when persisting the state store fails.
*/
def remove(key: String): Unit

/**
*
* @return
*/
def nextBatchSessionId: Int

/**
* Called when a batch session is created on another Livy server in HA mode
* @param batchMetadata
*/
def register(batchMetadata: BatchRecoveryMetadata): Unit = { }

/**
* Called when a batch session is deleted on another Livy server in HA mode
* @param batchMetadata
*/
def remove(batchMetadata: BatchRecoveryMetadata): Unit = { }

def setListener(sessionManagerListener: SessionManagerListener): Unit = {
this.sessionManagerListener = sessionManagerListener
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@
*/
package com.cloudera.livy.server.recovery

import java.io.IOException

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.curator.RetryPolicy
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.api.UnhandledErrorListener
import org.apache.curator.framework.recipes.atomic.{DistributedAtomicLong => DistributedLong}
import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener}
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode
import org.apache.curator.retry.RetryNTimes
import org.apache.zookeeper.KeeperException.NoNodeException

import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.LivyConf.Entry
import com.cloudera.livy.server.batch.BatchRecoveryMetadata

object ZooKeeperStateStore {
val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
Expand All @@ -36,7 +44,7 @@ object ZooKeeperStateStore {
class ZooKeeperStateStore(
livyConf: LivyConf,
mockCuratorClient: Option[CuratorFramework] = None) // For testing
extends StateStore(livyConf) with Logging {
extends StateStore(livyConf) with PathChildrenCacheListener with Logging {

import ZooKeeperStateStore._

Expand Down Expand Up @@ -74,6 +82,18 @@ class ZooKeeperStateStore(
}
})
curatorClient.start()

val batchSessionsCache = new PathChildrenCache(curatorClient, prefixKey("v1/batch"), true)

batchSessionsCache.getListenable.addListener(this)

try {
batchSessionsCache.start(StartMode.BUILD_INITIAL_CACHE)
} catch {
case _ : NullPointerException =>
throw new IllegalArgumentException("Invalid Zookeeper settings.")
}

// TODO Make sure ZK path has proper secure permissions so that other users cannot read its
// contents.

Expand Down Expand Up @@ -114,4 +134,99 @@ class ZooKeeperStateStore(
}

private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"


/**
* Increment the distributed value and return the value before increment.
* If the increment operation fails, retry until we succeed or run out of retries.
*
* @param distributedLong
* The distributed long value.
* @param retryCount
* the remaining retry counts
* @return
* `Some(Long)` if we succeed otherwise `None`
*/
@tailrec
private def recursiveTry(distributedLong: DistributedLong, retryCount: Int): Option[Long] = {
val updatedValue = distributedLong.increment
updatedValue.succeeded match {
case _ if retryCount <= 0 =>
None
case true if retryCount > 0 =>
Option(updatedValue.preValue())
case _ =>
recursiveTry(distributedLong, retryCount - 1)
}
}

/**
*
* @return
*/
def nextBatchSessionId: Int = {
val retryPolicy: RetryPolicy = new RetryNTimes(2, 3)

val zkPath = s"/$zkKeyPrefix/batchSessionId"
val distributedSessionId = new DistributedLong(curatorClient, zkPath, retryPolicy)

recursiveTry(distributedSessionId, 10) match {
case Some(sessionId) =>
sessionId.toInt
case None =>
val msg: String = "Failed to get the next session id from Zookeeper"
logger.warn(msg)
throw new IOException(msg)
}
}

override def register(batchRecoveryMetadata: BatchRecoveryMetadata): Unit = {
sessionManagerListener.register(batchRecoveryMetadata)
}

override def remove(batchMetadata: BatchRecoveryMetadata): Unit = {
sessionManagerListener.remove(batchMetadata)
}

override def childEvent(curator: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
event.getType match {
case PathChildrenCacheEvent.Type.CHILD_ADDED =>
logger.info(s"Type.CHILD_ADDED => ${event.getData.getPath}")
if (isNewSessionPath(event)) {
// it is an update to a session.
val batchMetadata = deserialize[BatchRecoveryMetadata](event.getData().getData)
register(batchMetadata)
} else {
// it is an update to something else
}

case PathChildrenCacheEvent.Type.CHILD_REMOVED if isNewSessionPath(event) =>
val batchMetadata = deserialize[BatchRecoveryMetadata](event.getData().getData)
val msg = s"${event.getData.getPath}: ${event.getData.getData.map(_.toChar).mkString}"
logger.info(s"Type.CHILD_REMOVED => ${msg}")
remove(batchMetadata)

case PathChildrenCacheEvent.Type.CHILD_UPDATED =>
logger.info(s"Type.CHILD_UPDATED => ${event.getData.getPath}")
val batchSessionData = deserialize[BatchRecoveryMetadata](event.getData().getData)
// register(batchSessionData)

case PathChildrenCacheEvent.Type.CONNECTION_LOST =>
logger.info(s"Type.CONNECTION_LOST=> ${event.getData.getPath}")

case PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED =>
logger.info(s"Type.CONNECTION_RECONNECTED => ${event.getData.getPath}")

case PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED =>
logger.info(s"Type.CONNECTION_SUSPENDED => ${event.getData.getPath}")

case PathChildrenCacheEvent.Type.INITIALIZED =>
logger.info(s"Type.INITIALIZED => ${event.getData.getPath}")

}
}

private def isNewSessionPath(event: PathChildrenCacheEvent): Boolean = {
event.getData.getPath.last.isDigit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.server.recovery.{SessionManagerListener, SessionStore}
import com.cloudera.livy.sessions.Session.RecoveryMetadata

object SessionManager {
Expand Down Expand Up @@ -63,7 +63,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
sessionStore: SessionStore,
sessionType: String,
mockSessions: Option[Seq[S]] = None)
extends Logging {
extends SessionManagerListener with Logging {

import SessionManager._

Expand Down Expand Up @@ -168,4 +168,39 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](

}

override def register(recoveryData: BatchRecoveryMetadata): Unit = {
synchronized {
sessions.contains(recoveryData.id) match {
case true =>
// This session manager added the session. No need to register it
logger.info(s"The session manager already has information for $recoveryData")
case false =>
logger.info(s"The session saw new session $recoveryData from ZooKeeper")
recoveryData.appTag match {
case null =>
// the app is not registered in yarn yet. wait until it is
logger.info(s"Session manager cannot find an appTag for $recoveryData from ZooKeeper")
case appTag => BatchSession.update(recoveryData, livyConf, sessionStore, None)
logger.info(s"Session Manager found new Batch session $recoveryData from ZooKeeper")
val session = BatchSession.update(recoveryData, livyConf, sessionStore)
register(session.asInstanceOf[S])
}
}
}
}


override def remove(recoveryData: BatchRecoveryMetadata): Unit = {
synchronized {
sessions.contains(recoveryData.id) match {
case true =>
// This session manager added the session. No need to register it
logger.info(s"Removing session $recoveryData")
sessions.remove(recoveryData.id)
case false =>
logger.info(s"The session is not present")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import scala.collection.JavaConverters._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.api._
import org.apache.curator.framework.listen.Listenable
import org.apache.curator.framework.state.ConnectionStateListener
import org.apache.curator.utils.EnsurePath
import org.apache.zookeeper.data.Stat
import org.mockito.Matchers.anyString
import org.mockito.Mockito._
import org.scalatest.FunSpec
import org.scalatest.Matchers._
Expand All @@ -43,6 +46,16 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
val curatorClient = mock[CuratorFramework]
when(curatorClient.getUnhandledErrorListenable())
.thenReturn(mock[Listenable[UnhandledErrorListener]])
when(curatorClient.getConnectionStateListenable())
.thenReturn(mock[Listenable[ConnectionStateListener]])
when(curatorClient.newNamespaceAwareEnsurePath(anyString()))
.thenReturn(mock[EnsurePath])

val builder: GetChildrenBuilder = mock[GetChildrenBuilder]
when(curatorClient.getChildren())
.thenReturn(builder)
when(builder.forPath(anyString()))
.thenReturn(new java.util.LinkedList[String]())
val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
testBody(TestFixture(stateStore, curatorClient))
}
Expand Down