diff --git a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala index 8c3f467a3..f9287974a 100644 --- a/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala +++ b/server/src/main/scala/com/cloudera/livy/server/LivyServer.scala @@ -95,8 +95,8 @@ class LivyServer extends Logging { StateStore.init(livyConf) val sessionStore = new SessionStore(livyConf) - val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore) + val batchSessionManager = new BatchSessionManager(livyConf, sessionStore) server = new WebServer(livyConf, host, port) server.context.setResourceBase("src/main/com/cloudera/livy/server") diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala index 452a9d869..78902d14b 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSession.scala @@ -117,6 +117,18 @@ object BatchSession { SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) }) } + + def update( + m: BatchRecoveryMetadata, + livyConf: LivyConf, + sessionStore: SessionStore, + mockApp: Option[SparkApp] = None): BatchSession = { + val app = mockApp.map { m => (_: BatchSession) => m }.get /* .getOrElse { s => + SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s)) + } */ + new BatchSession( + m.id, m.appTag, SessionState.Recovering(), livyConf, m.owner, m.proxyUser, sessionStore, app) + } } class BatchSession( diff --git a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala index f14554207..6901e234d 100644 --- a/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/com/cloudera/livy/server/batch/BatchSessionServlet.scala @@ -42,9 +42,10 @@ class BatchSessionServlet( override protected def createSession(req: HttpServletRequest): BatchSession = { val createRequest = bodyAs[CreateBatchRequest](req) + val sessionId: Int = sessionStore.getNextBatchSessionId val proxyUser = checkImpersonation(createRequest.proxyUser, req) BatchSession.create( - sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore) + sessionId, createRequest, livyConf, remoteUser(req), proxyUser, sessionStore) } override protected[batch] def clientSessionView( diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala index e0a33371b..96eefc52c 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/BlackholeStateStore.scala @@ -34,4 +34,7 @@ class BlackholeStateStore(livyConf: LivyConf) extends StateStore(livyConf) { def getChildren(key: String): Seq[String] = List.empty[String] def remove(key: String): Unit = {} + + def nextBatchSessionId: Int = 0 + } diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala index d841c6328..dc8d5106a 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/FileSystemStateStore.scala @@ -120,4 +120,10 @@ class FileSystemStateStore( } private def absPath(key: String): Path = new Path(fsUri.getPath(), key) + + def nextBatchSessionId: Int = { + val nextId = get[Int]("batchSessionId").getOrElse(0) + this.set("batchSessionId", (nextId + 1).asInstanceOf[Object]) + nextId + } } diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionManagerListener.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionManagerListener.scala new file mode 100644 index 000000000..1c6983e90 --- /dev/null +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionManagerListener.scala @@ -0,0 +1,28 @@ +/* + * Licensed to Cloudera, Inc. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Cloudera, Inc. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.livy.server.recovery + +import com.cloudera.livy.server.batch.BatchRecoveryMetadata + +/** + * Methods in this trait will be called when a Livy server detects a change in the ZooKeeper server + */ +trait SessionManagerListener { + def register(recoveryData: BatchRecoveryMetadata): Unit + def remove(batchMetadata: BatchRecoveryMetadata): Unit +} diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala index c146a5746..566d80b58 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/SessionStore.scala @@ -79,6 +79,12 @@ class SessionStore( .map(_.nextSessionId).getOrElse(0) } + def getNextBatchSessionId: Int = { + val id = store.nextBatchSessionId + logger.info(s"The next batch session id=$id") + id + } + /** * Remove a session from the state store. */ @@ -94,4 +100,8 @@ class SessionStore( private def sessionPath(sessionType: String, id: Int): String = s"$STORE_VERSION/$sessionType/$id" + + def setListener(sessionManagerListener: SessionManagerListener): Unit = { + store.setListener(sessionManagerListener) + } } diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala index 18cf6ade4..f5a2f34a4 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/StateStore.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.cloudera.livy.{LivyConf, Logging} +import com.cloudera.livy.server.batch.BatchRecoveryMetadata import com.cloudera.livy.sessions.SessionKindModule import com.cloudera.livy.sessions.SessionManager._ @@ -45,6 +46,9 @@ protected trait JsonMapper { * Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later. */ abstract class StateStore(livyConf: LivyConf) extends JsonMapper { + + protected var sessionManagerListener: SessionManagerListener = _ + /** * Set a key-value pair to this state store. It overwrites existing value. * @throws Exception Throw when persisting the state store fails. @@ -71,6 +75,28 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper { * @throws Exception Throw when persisting the state store fails. */ def remove(key: String): Unit + + /** + * + * @return + */ + def nextBatchSessionId: Int + + /** + * Called when a batch session is created on another Livy server in HA mode + * @param batchMetadata + */ + def register(batchMetadata: BatchRecoveryMetadata): Unit = { } + + /** + * Called when a batch session is deleted on another Livy server in HA mode + * @param batchMetadata + */ + def remove(batchMetadata: BatchRecoveryMetadata): Unit = { } + + def setListener(sessionManagerListener: SessionManagerListener): Unit = { + this.sessionManagerListener = sessionManagerListener + } } /** diff --git a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala index 883383590..a4f752222 100644 --- a/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala +++ b/server/src/main/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStore.scala @@ -17,16 +17,24 @@ */ package com.cloudera.livy.server.recovery +import java.io.IOException + +import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.curator.RetryPolicy import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.framework.api.UnhandledErrorListener +import org.apache.curator.framework.recipes.atomic.{DistributedAtomicLong => DistributedLong} +import org.apache.curator.framework.recipes.cache.{PathChildrenCache, PathChildrenCacheEvent, PathChildrenCacheListener} +import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode import org.apache.curator.retry.RetryNTimes import org.apache.zookeeper.KeeperException.NoNodeException import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.LivyConf.Entry +import com.cloudera.livy.server.batch.BatchRecoveryMetadata object ZooKeeperStateStore { val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy") @@ -36,7 +44,7 @@ object ZooKeeperStateStore { class ZooKeeperStateStore( livyConf: LivyConf, mockCuratorClient: Option[CuratorFramework] = None) // For testing - extends StateStore(livyConf) with Logging { + extends StateStore(livyConf) with PathChildrenCacheListener with Logging { import ZooKeeperStateStore._ @@ -74,6 +82,18 @@ class ZooKeeperStateStore( } }) curatorClient.start() + + val batchSessionsCache = new PathChildrenCache(curatorClient, prefixKey("v1/batch"), true) + + batchSessionsCache.getListenable.addListener(this) + + try { + batchSessionsCache.start(StartMode.BUILD_INITIAL_CACHE) + } catch { + case _ : NullPointerException => + throw new IllegalArgumentException("Invalid Zookeeper settings.") + } + // TODO Make sure ZK path has proper secure permissions so that other users cannot read its // contents. @@ -114,4 +134,99 @@ class ZooKeeperStateStore( } private def prefixKey(key: String) = s"/$zkKeyPrefix/$key" + + + /** + * Increment the distributed value and return the value before increment. + * If the increment operation fails, retry until we succeed or run out of retries. + * + * @param distributedLong + * The distributed long value. + * @param retryCount + * the remaining retry counts + * @return + * `Some(Long)` if we succeed otherwise `None` + */ + @tailrec + private def recursiveTry(distributedLong: DistributedLong, retryCount: Int): Option[Long] = { + val updatedValue = distributedLong.increment + updatedValue.succeeded match { + case _ if retryCount <= 0 => + None + case true if retryCount > 0 => + Option(updatedValue.preValue()) + case _ => + recursiveTry(distributedLong, retryCount - 1) + } + } + + /** + * + * @return + */ + def nextBatchSessionId: Int = { + val retryPolicy: RetryPolicy = new RetryNTimes(2, 3) + + val zkPath = s"/$zkKeyPrefix/batchSessionId" + val distributedSessionId = new DistributedLong(curatorClient, zkPath, retryPolicy) + + recursiveTry(distributedSessionId, 10) match { + case Some(sessionId) => + sessionId.toInt + case None => + val msg: String = "Failed to get the next session id from Zookeeper" + logger.warn(msg) + throw new IOException(msg) + } + } + + override def register(batchRecoveryMetadata: BatchRecoveryMetadata): Unit = { + sessionManagerListener.register(batchRecoveryMetadata) + } + + override def remove(batchMetadata: BatchRecoveryMetadata): Unit = { + sessionManagerListener.remove(batchMetadata) + } + + override def childEvent(curator: CuratorFramework, event: PathChildrenCacheEvent): Unit = { + event.getType match { + case PathChildrenCacheEvent.Type.CHILD_ADDED => + logger.info(s"Type.CHILD_ADDED => ${event.getData.getPath}") + if (isNewSessionPath(event)) { + // it is an update to a session. + val batchMetadata = deserialize[BatchRecoveryMetadata](event.getData().getData) + register(batchMetadata) + } else { + // it is an update to something else + } + + case PathChildrenCacheEvent.Type.CHILD_REMOVED if isNewSessionPath(event) => + val batchMetadata = deserialize[BatchRecoveryMetadata](event.getData().getData) + val msg = s"${event.getData.getPath}: ${event.getData.getData.map(_.toChar).mkString}" + logger.info(s"Type.CHILD_REMOVED => ${msg}") + remove(batchMetadata) + + case PathChildrenCacheEvent.Type.CHILD_UPDATED => + logger.info(s"Type.CHILD_UPDATED => ${event.getData.getPath}") + val batchSessionData = deserialize[BatchRecoveryMetadata](event.getData().getData) + // register(batchSessionData) + + case PathChildrenCacheEvent.Type.CONNECTION_LOST => + logger.info(s"Type.CONNECTION_LOST=> ${event.getData.getPath}") + + case PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED => + logger.info(s"Type.CONNECTION_RECONNECTED => ${event.getData.getPath}") + + case PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED => + logger.info(s"Type.CONNECTION_SUSPENDED => ${event.getData.getPath}") + + case PathChildrenCacheEvent.Type.INITIALIZED => + logger.info(s"Type.INITIALIZED => ${event.getData.getPath}") + + } + } + + private def isNewSessionPath(event: PathChildrenCacheEvent): Boolean = { + event.getData.getPath.last.isDigit + } } diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala index ac0561fb6..57dac9bcc 100644 --- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala +++ b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala @@ -30,7 +30,7 @@ import scala.util.control.NonFatal import com.cloudera.livy.{LivyConf, Logging} import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession} import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession} -import com.cloudera.livy.server.recovery.SessionStore +import com.cloudera.livy.server.recovery.{SessionManagerListener, SessionStore} import com.cloudera.livy.sessions.Session.RecoveryMetadata object SessionManager { @@ -63,7 +63,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( sessionStore: SessionStore, sessionType: String, mockSessions: Option[Seq[S]] = None) - extends Logging { + extends SessionManagerListener with Logging { import SessionManager._ @@ -168,4 +168,39 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag]( } + override def register(recoveryData: BatchRecoveryMetadata): Unit = { + synchronized { + sessions.contains(recoveryData.id) match { + case true => + // This session manager added the session. No need to register it + logger.info(s"The session manager already has information for $recoveryData") + case false => + logger.info(s"The session saw new session $recoveryData from ZooKeeper") + recoveryData.appTag match { + case null => + // the app is not registered in yarn yet. wait until it is + logger.info(s"Session manager cannot find an appTag for $recoveryData from ZooKeeper") + case appTag => BatchSession.update(recoveryData, livyConf, sessionStore, None) + logger.info(s"Session Manager found new Batch session $recoveryData from ZooKeeper") + val session = BatchSession.update(recoveryData, livyConf, sessionStore) + register(session.asInstanceOf[S]) + } + } + } + } + + + override def remove(recoveryData: BatchRecoveryMetadata): Unit = { + synchronized { + sessions.contains(recoveryData.id) match { + case true => + // This session manager added the session. No need to register it + logger.info(s"Removing session $recoveryData") + sessions.remove(recoveryData.id) + case false => + logger.info(s"The session is not present") + } + } + } + } diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala index 3e8848bf9..39fd64e06 100644 --- a/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala +++ b/server/src/test/scala/com/cloudera/livy/server/recovery/ZooKeeperStateStoreSpec.scala @@ -23,7 +23,10 @@ import scala.collection.JavaConverters._ import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.api._ import org.apache.curator.framework.listen.Listenable +import org.apache.curator.framework.state.ConnectionStateListener +import org.apache.curator.utils.EnsurePath import org.apache.zookeeper.data.Stat +import org.mockito.Matchers.anyString import org.mockito.Mockito._ import org.scalatest.FunSpec import org.scalatest.Matchers._ @@ -43,6 +46,16 @@ class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { val curatorClient = mock[CuratorFramework] when(curatorClient.getUnhandledErrorListenable()) .thenReturn(mock[Listenable[UnhandledErrorListener]]) + when(curatorClient.getConnectionStateListenable()) + .thenReturn(mock[Listenable[ConnectionStateListener]]) + when(curatorClient.newNamespaceAwareEnsurePath(anyString())) + .thenReturn(mock[EnsurePath]) + + val builder: GetChildrenBuilder = mock[GetChildrenBuilder] + when(curatorClient.getChildren()) + .thenReturn(builder) + when(builder.forPath(anyString())) + .thenReturn(new java.util.LinkedList[String]()) val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient)) testBody(TestFixture(stateStore, curatorClient)) }