diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala index 5578da0de78..0903daaadd5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/common/AverageRingBuffer.scala @@ -37,7 +37,7 @@ class AverageRingBuffer(private val maxSize: Int) { def nonEmpty: Boolean = elements.nonEmpty - def average: Double = { + def average: Double = { val size = elements.size if (size > 2) { (sum - max - min) / (size - 2) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala index d3d60d4622b..b6eaed5cb84 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala @@ -18,8 +18,10 @@ package org.apache.openwhisk.core.scheduler import akka.Done -import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown} +import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown, Props} +import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.util.Timeout +import akka.pattern.ask import com.typesafe.config.ConfigValueFactory import kamon.Kamon import org.apache.openwhisk.common.Https.HttpsConfig @@ -30,8 +32,20 @@ import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} import org.apache.openwhisk.core.connector._ import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext} import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys} +import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig} -import org.apache.openwhisk.core.service.{LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager} +import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl +import org.apache.openwhisk.core.scheduler.queue.{ + DurationCheckerProvider, + MemoryQueue, + QueueManager, + QueueSize, + SchedulingDecisionMaker +} +import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService} +import org.apache.openwhisk.grpc.ActivationServiceHandler import org.apache.openwhisk.http.BasicHttpService import org.apache.openwhisk.spi.SpiLoader import org.apache.openwhisk.utils.ExecutionContextFactory @@ -44,6 +58,8 @@ import scala.language.postfixOps import scala.util.{Failure, Success, Try} import pureconfig.generic.auto._ +import scala.collection.JavaConverters + class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(implicit config: WhiskConfig, actorSystem: ActorSystem, logging: Logging) @@ -77,24 +93,53 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE case Failure(t) => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}") } } - val durationCheckerProvider = "" // TODO: TBD - val durationChecker = "" // TODO: TBD + val durationCheckerProvider = SpiLoader.get[DurationCheckerProvider] + val durationChecker = durationCheckerProvider.instance(actorSystem, logging) override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = { - Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it + logging.info(this, s"getting the queue states") + etcdClient + .getPrefix(s"${QueueKeys.inProgressPrefix}/${QueueKeys.queuePrefix}") + .map(res => { + JavaConverters + .asScalaIteratorConverter(res.getKvsList.iterator()) + .asScala + .map(kv => ByteStringToString(kv.getValue)) + .count(_ == schedulerId.asString) + }) + .flatMap { creationCount => + etcdClient + .get(SchedulerKeys.scheduler(schedulerId)) + .map(res => { + JavaConverters + .asScalaIteratorConverter(res.getKvsList.iterator()) + .asScala + .map { kv => + SchedulerStates.parse(kv.getValue).getOrElse(SchedulerStates(schedulerId, -1, schedulerEndpoints)) + } + .map { schedulerState => + (schedulerState.sid, schedulerState.queueSize) + } + .toList + }) + .map { list => + (list, creationCount) + } + } } override def getQueueSize: Future[Int] = { - Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it + queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int] } override def getQueueStatusData: Future[List[StatusData]] = { - Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it + queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten } override def disable(): Unit = { logging.info(this, s"Gracefully shutting down the scheduler") - // TODO: TBD, after containerManager and queueManager are ready, can implement it + containerManager ! GracefulShutdown + queueManager ! GracefulShutdown } private def getUserLimit(invocationNamespace: String): Future[Int] = { @@ -113,27 +158,67 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE } } - private val etcdWorkerFactory = "" // TODO: TBD + private val etcdWorkerFactory = (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient, leaseService)) /** * This component is in charge of storing data to ETCD. * Even if any error happens we can assume the data will be eventually available in the ETCD by this component. */ - val dataManagementService = "" // TODO: TBD + val dataManagementService: ActorRef = + actorSystem.actorOf(DataManagementService.props(watcherService, etcdWorkerFactory)) + + val feedFactory = (f: ActorRefFactory, + description: String, + topic: String, + maxActiveAcksPerPoll: Int, + processAck: Array[Byte] => Future[Unit]) => { + val consumer = msgProvider.getConsumer(config, topic, topic, maxActiveAcksPerPoll) + f.actorOf(Props(new MessageFeed(description, logging, consumer, maxActiveAcksPerPoll, 1.second, processAck))) + } - val creationJobManagerFactory = "" // TODO: TBD + val creationJobManagerFactory: ActorRefFactory => ActorRef = + factory => { + factory.actorOf(CreationJobManager.props(feedFactory, schedulerId, dataManagementService)) + } /** * This component is responsible for creating containers for a given action. * It relies on the creationJobManager to manage the container creation job. */ - val containerManager = "" // TODO: TBD + val containerManager: ActorRef = + actorSystem.actorOf( + ContainerManager.props(creationJobManagerFactory, msgProvider, schedulerId, etcdClient, config, watcherService)) /** * This is a factory to create memory queues. * In the new architecture, each action is given its own dedicated queue. */ - val memoryQueueFactory = "" // TODO: TBD + val memoryQueueFactory + : (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef = + (factory, invocationNamespace, fqn, revision, actionMetaData) => { + // Todo: Change this to SPI + val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn)) + + factory.actorOf( + MemoryQueue.props( + etcdClient, + durationChecker, + fqn, + producer, + config, + invocationNamespace, + revision, + schedulerEndpoints, + actionMetaData, + dataManagementService, + watcherService, + containerManager, + decisionMaker, + schedulerId: SchedulerInstanceId, + ack, + store: (TransactionId, WhiskActivation, UserContext) => Future[Any], + getUserLimit: String => Future[Int])) + } val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}" val schedulerConsumer = @@ -144,9 +229,22 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE /** * This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers. */ - val queueManager = "" // TODO: TBD - - //val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD + val queueManager = actorSystem.actorOf( + QueueManager.props( + entityStore, + WhiskActionMetaData.get, + etcdClient, + schedulerEndpoints, + schedulerId, + dataManagementService, + watcherService, + ack, + store: (TransactionId, WhiskActivation, UserContext) => Future[Any], + memoryQueueFactory, + schedulerConsumer), + QueueManager.actorName) + + val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) } case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala index 2cf2a710581..2c2de3bd1b7 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/container/CreationJobManager.scala @@ -45,7 +45,7 @@ case object GetPoolStatus case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable) -class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef, +class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef, schedulerInstanceId: SchedulerInstanceId, dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) extends Actor { @@ -201,7 +201,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix) private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}" private val maxActiveAcksPerPoll = 128 - private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement) + private val ackFeed = feedFactory(actorSystem, "creationAck", topic, maxActiveAcksPerPoll, processAcknowledgement) def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = { Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8))) @@ -224,7 +224,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] } object CreationJobManager { - def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef, + def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef, schedulerInstanceId: SchedulerInstanceId, dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) = Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService)) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index 1de9d07d09f..7809b90d385 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -34,9 +34,19 @@ import org.apache.openwhisk.core.etcd.EtcdClient import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys} import org.apache.openwhisk.core.scheduler.SchedulerEndpoints -import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob} +import org.apache.openwhisk.core.scheduler.message.{ + ContainerCreation, + ContainerDeletion, + FailedCreationJob, + SuccessfulCreationJob +} import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse} -import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob} +import org.apache.openwhisk.core.scheduler.message.{ + ContainerCreation, + ContainerDeletion, + FailedCreationJob, + SuccessfulCreationJob +} import org.apache.openwhisk.core.service._ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests} @@ -48,7 +58,7 @@ import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.collection.mutable import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration} +import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise} import scala.language.postfixOps import scala.util.{Failure, Success} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala index 243a58ec981..b60b0cf421b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/container/test/CreationJobManagerTests.scala @@ -95,6 +95,7 @@ class CreationJobManagerTests } def feedFactory(actorRefFactory: ActorRefFactory, + description: String, topic: String, maxActiveAcksPerPoll: Int, handler: Array[Byte] => Future[Unit]): ActorRef = {