Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Implement KeepAliveService #5067

Merged
merged 4 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ object LoggingMarkers {
val timeout = "timeout"

private val controller = "controller"
private val scheduler = "scheduler"
private val invoker = "invoker"
private val database = "database"
private val activation = "activation"
Expand Down Expand Up @@ -555,6 +556,9 @@ object LoggingMarkers {
LogMarkerToken(kafka, "topic", start, Some("delay"), Map("topic" -> topic))(MeasurementUnit.time.milliseconds)
else LogMarkerToken(kafka, topic, start, Some("delay"))(MeasurementUnit.time.milliseconds)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)

/*
* General markers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ object ConfigKeys {
val controllerActivation = s"$controller.activation"

val etcd = "whisk.etcd"
val etcdLeaseTimeout = "whisk.etcd.lease.timeout"
val etcdPoolThreads = "whisk.etcd.pool.threads"

val activationStore = "whisk.activation-store"
Expand All @@ -290,5 +291,7 @@ object ConfigKeys {

val azBlob = "whisk.azure-blob"

val schedulerMaxPeek = "whisk.scheduler.max-peek"

val whiskClusterName = "whisk.cluster.name"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.service

import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.pattern.pipe
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.InstanceId
import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.InstanceKeys.instanceLease
import pureconfig.loadConfigOrThrow

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

// States
sealed trait KeepAliveServiceState
case object Ready extends KeepAliveServiceState
case object Active extends KeepAliveServiceState

// Data
sealed trait KeepAliveServiceData
case object NoData extends KeepAliveServiceData
case class Lease(id: Long, ttl: Long) extends KeepAliveServiceData
case class ActiveStates(worker: Cancellable, lease: Lease) extends KeepAliveServiceData

// Events received by the actor
case object RegrantLease
case object GetLease
case object GrantLease

// Events internally used
case class SetLease(lease: Lease)
case class SetWatcher(worker: Cancellable)

class LeaseKeepAliveService(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
implicit logging: Logging,
actorSystem: ActorSystem)
extends FSM[KeepAliveServiceState, KeepAliveServiceData]
with Stash {

implicit val ec: ExecutionContextExecutor = context.dispatcher

private val leaseTimeout = loadConfigOrThrow[Int](ConfigKeys.etcdLeaseTimeout).seconds
private val key = instanceLease(instanceId)
private val watcherName = "lease-service"

self ! GrantLease
startWith(Ready, NoData)

when(Ready) {
case Event(GrantLease, NoData) =>
etcdClient
.grant(leaseTimeout.toSeconds)
.map { res =>
SetLease(Lease(res.getID, res.getTTL))
}
.pipeTo(self)
stay

case Event(SetLease(lease), NoData) =>
startKeepAliveService(lease)
.pipeTo(self)
logging.info(this, s"Granted a new lease $lease")
stay using lease

case Event(SetWatcher(w), l: Lease) =>
goto(Active) using ActiveStates(w, l)

case Event(t: FailureMessage, _) =>
logging.warn(this, s"Failed to grant new lease caused by: $t")
self ! GrantLease
stay()

case _ => delay
}

when(Active) {
case Event(WatchEndpointRemoved(`key`, `key`, _, false), ActiveStates(worker, lease)) =>
logging.info(this, s"endpoint ie removed so recreate a lease")
recreateLease(worker, lease)

case Event(RegrantLease, ActiveStates(worker, lease)) =>
logging.info(this, s"ReGrant a lease, old lease:${lease}")
recreateLease(worker, lease)

case Event(GetLease, ActiveStates(_, lease)) =>
logging.info(this, s"send the lease(${lease}) to ${sender()}")
sender() ! lease
stay()

case _ => delay
}

initialize()

private def startKeepAliveService(lease: Lease): Future[SetWatcher] = {
val worker =
actorSystem.scheduler.schedule(initialDelay = 0.second, interval = 500.milliseconds)(keepAliveOnce(lease))

/**
* To verify that lease has been deleted since timeout,
* create a key using lease, watch the key, and receive an event for deletion.
*/
etcdClient.put(key, s"${lease.id}", lease.id).map { _ =>
watcherService ! WatchEndpoint(key, s"${lease.id}", false, watcherName, Set(DeleteEvent))
SetWatcher(worker)
}
}

private def keepAliveOnce(lease: Lease): Future[Long] = {
etcdClient
.keepAliveOnce(lease.id)
.map(_.getID)
.andThen {
case Success(_) => MetricEmitter.emitCounterMetric(LoggingMarkers.SCHEDULER_KEEP_ALIVE(lease.id))
case Failure(t) =>
logging.warn(this, s"Failed to keep-alive of ${lease.id} caused by ${t}")
self ! RegrantLease
}
}

private def recreateLease(worker: Cancellable, lease: Lease) = {
logging.info(this, s"recreate a lease, old lease: $lease")
worker.cancel() // stop scheduler
watcherService ! UnwatchEndpoint(key, false, watcherName) // stop watcher
etcdClient
.revoke(lease.id) // delete lease
.onComplete(_ => self ! GrantLease) // create lease
goto(Ready) using NoData
}

// Unstash all messages stashed while in intermediate state
onTransition {
case _ -> Ready => unstashAll()
case _ -> Active => unstashAll()
}

/** Delays all incoming messages until unstashAll() is called */
def delay = {
stash()
stay
}

override def postStop(): Unit = {
stateData match {
case ActiveStates(w, _) => w.cancel() // stop scheduler if that exist
case _ => // do nothing
}
watcherService ! UnwatchEndpoint(key, false, watcherName)
}
}

object LeaseKeepAliveService {
def props(etcdClient: EtcdClient, instanceId: InstanceId, watcherService: ActorRef)(
implicit logging: Logging,
actorSystem: ActorSystem): Props = {
Props(new LeaseKeepAliveService(etcdClient, instanceId, watcherService))
.withDispatcher("dispatchers.lease-service-dispatcher")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
package org.apache.openwhisk.core.scheduler

import akka.Done
import akka.actor.{ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.WhiskConfig.{servicePort, _}
import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.service.{LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.http.BasicHttpService
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
Expand All @@ -55,10 +57,11 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
val msgProvider = SpiLoader.get[MessagingProvider]
val producer = msgProvider.getProducer(config, Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT))

val maxPeek = "" // TODO: TBD
val etcdClient = "" // TODO: TBD
val watcherService = "" // TODO: TBD
val leaseService = "" // TODO: TBD
val maxPeek = loadConfigOrThrow[Int](ConfigKeys.schedulerMaxPeek)
val etcdClient = EtcdClient(loadConfigOrThrow[EtcdConfig](ConfigKeys.etcd).hosts)
val watcherService: ActorRef = actorSystem.actorOf(WatcherService.props(etcdClient))
val leaseService =
actorSystem.actorOf(LeaseKeepAliveService.props(etcdClient, schedulerId, watcherService))

implicit val entityStore = WhiskEntityStore.datastore()
private val activationStore =
Expand Down Expand Up @@ -139,7 +142,7 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
config,
s"scheduler${schedulerId.asString}",
s"scheduler${schedulerId.asString}",
500, // TODO: to be updated with maxPeek variable
maxPeek,
maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)

implicit val trasnid = TransactionId.containerCreation
Expand Down
Loading