Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Update main method of the scheduler. #5157

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class AverageRingBuffer(private val maxSize: Int) {

def nonEmpty: Boolean = elements.nonEmpty

def average: Double = {
def average: Double = {
val size = elements.size
if (size > 2) {
(sum - max - min) / (size - 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.openwhisk.core.scheduler

import akka.Done
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown}
import akka.actor.{ActorRef, ActorRefFactory, ActorSelection, ActorSystem, CoordinatedShutdown, Props}
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.util.Timeout
import akka.pattern.ask
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
import org.apache.openwhisk.common.Https.HttpsConfig
Expand All @@ -30,8 +32,20 @@ import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender}
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.database.{ActivationStoreProvider, NoDocumentException, UserContext}
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.etcd.EtcdKV.{QueueKeys, SchedulerKeys}
import org.apache.openwhisk.core.etcd.EtcdType.ByteStringToString
import org.apache.openwhisk.core.etcd.{EtcdClient, EtcdConfig}
import org.apache.openwhisk.core.service.{LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.core.scheduler.container.{ContainerManager, CreationJobManager}
import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl
import org.apache.openwhisk.core.scheduler.queue.{
DurationCheckerProvider,
MemoryQueue,
QueueManager,
QueueSize,
SchedulingDecisionMaker
}
import org.apache.openwhisk.core.service.{DataManagementService, EtcdWorker, LeaseKeepAliveService, WatcherService}
import org.apache.openwhisk.grpc.ActivationServiceHandler
import org.apache.openwhisk.http.BasicHttpService
import org.apache.openwhisk.spi.SpiLoader
import org.apache.openwhisk.utils.ExecutionContextFactory
Expand All @@ -44,6 +58,8 @@ import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
import pureconfig.generic.auto._

import scala.collection.JavaConverters

class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerEndpoints)(implicit config: WhiskConfig,
actorSystem: ActorSystem,
logging: Logging)
Expand Down Expand Up @@ -77,24 +93,53 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
case Failure(t) => logging.error(this, s"failed to save activation $activation, error: ${t.getMessage}")
}
}
val durationCheckerProvider = "" // TODO: TBD
val durationChecker = "" // TODO: TBD
val durationCheckerProvider = SpiLoader.get[DurationCheckerProvider]
val durationChecker = durationCheckerProvider.instance(actorSystem, logging)

override def getState: Future[(List[(SchedulerInstanceId, Int)], Int)] = {
Future.successful((List((schedulerId, 0)), 0)) // TODO: TBD, after etcdClient is ready, can implement it
logging.info(this, s"getting the queue states")
etcdClient
.getPrefix(s"${QueueKeys.inProgressPrefix}/${QueueKeys.queuePrefix}")
.map(res => {
JavaConverters
.asScalaIteratorConverter(res.getKvsList.iterator())
.asScala
.map(kv => ByteStringToString(kv.getValue))
.count(_ == schedulerId.asString)
})
.flatMap { creationCount =>
etcdClient
.get(SchedulerKeys.scheduler(schedulerId))
.map(res => {
JavaConverters
.asScalaIteratorConverter(res.getKvsList.iterator())
.asScala
.map { kv =>
SchedulerStates.parse(kv.getValue).getOrElse(SchedulerStates(schedulerId, -1, schedulerEndpoints))
}
.map { schedulerState =>
(schedulerState.sid, schedulerState.queueSize)
}
.toList
})
.map { list =>
(list, creationCount)
}
}
}

override def getQueueSize: Future[Int] = {
Future.successful(0) // TODO: TBD, after queueManager is ready, can implement it
queueManager.ask(QueueSize)(Timeout(5.seconds)).mapTo[Int]
}

override def getQueueStatusData: Future[List[StatusData]] = {
Future.successful(List(StatusData("ns", "fqn", 0, "Running", "data"))) // TODO: TBD, after queueManager is ready, can implement it
queueManager.ask(StatusQuery)(Timeout(5.seconds)).mapTo[Future[List[StatusData]]].flatten
}

override def disable(): Unit = {
logging.info(this, s"Gracefully shutting down the scheduler")
// TODO: TBD, after containerManager and queueManager are ready, can implement it
containerManager ! GracefulShutdown
queueManager ! GracefulShutdown
}

private def getUserLimit(invocationNamespace: String): Future[Int] = {
Expand All @@ -113,27 +158,67 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
}
}

private val etcdWorkerFactory = "" // TODO: TBD
private val etcdWorkerFactory = (f: ActorRefFactory) => f.actorOf(EtcdWorker.props(etcdClient, leaseService))

/**
* This component is in charge of storing data to ETCD.
* Even if any error happens we can assume the data will be eventually available in the ETCD by this component.
*/
val dataManagementService = "" // TODO: TBD
val dataManagementService: ActorRef =
actorSystem.actorOf(DataManagementService.props(watcherService, etcdWorkerFactory))

val feedFactory = (f: ActorRefFactory,
description: String,
topic: String,
maxActiveAcksPerPoll: Int,
processAck: Array[Byte] => Future[Unit]) => {
val consumer = msgProvider.getConsumer(config, topic, topic, maxActiveAcksPerPoll)
f.actorOf(Props(new MessageFeed(description, logging, consumer, maxActiveAcksPerPoll, 1.second, processAck)))
}

val creationJobManagerFactory = "" // TODO: TBD
val creationJobManagerFactory: ActorRefFactory => ActorRef =
factory => {
factory.actorOf(CreationJobManager.props(feedFactory, schedulerId, dataManagementService))
}

/**
* This component is responsible for creating containers for a given action.
* It relies on the creationJobManager to manage the container creation job.
*/
val containerManager = "" // TODO: TBD
val containerManager: ActorRef =
actorSystem.actorOf(
ContainerManager.props(creationJobManagerFactory, msgProvider, schedulerId, etcdClient, config, watcherService))

/**
* This is a factory to create memory queues.
* In the new architecture, each action is given its own dedicated queue.
*/
val memoryQueueFactory = "" // TODO: TBD
val memoryQueueFactory
: (ActorRefFactory, String, FullyQualifiedEntityName, DocRevision, WhiskActionMetaData) => ActorRef =
(factory, invocationNamespace, fqn, revision, actionMetaData) => {
// Todo: Change this to SPI
val decisionMaker = factory.actorOf(SchedulingDecisionMaker.props(invocationNamespace, fqn))

factory.actorOf(
MemoryQueue.props(
etcdClient,
durationChecker,
fqn,
producer,
config,
invocationNamespace,
revision,
schedulerEndpoints,
actionMetaData,
dataManagementService,
watcherService,
containerManager,
decisionMaker,
schedulerId: SchedulerInstanceId,
ack,
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
getUserLimit: String => Future[Int]))
}

val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}"
val schedulerConsumer =
Expand All @@ -144,9 +229,22 @@ class Scheduler(schedulerId: SchedulerInstanceId, schedulerEndpoints: SchedulerE
/**
* This is one of the major components which take charge of managing queues and coordinating requests among the scheduler, controllers, and invokers.
*/
val queueManager = "" // TODO: TBD

//val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl()) TODO: TBD
val queueManager = actorSystem.actorOf(
QueueManager.props(
entityStore,
WhiskActionMetaData.get,
etcdClient,
schedulerEndpoints,
schedulerId,
dataManagementService,
watcherService,
ack,
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
memoryQueueFactory,
schedulerConsumer),
QueueManager.actorName)

val serviceHandlers: HttpRequest => Future[HttpResponse] = ActivationServiceHandler.apply(ActivationServiceImpl())
}

case class CmdLineArgs(uniqueName: Option[String] = None, id: Option[Int] = None, displayedName: Option[String] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ case object GetPoolStatus

case class JobEntry(action: FullyQualifiedEntityName, timer: Cancellable)

class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
class CreationJobManager(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging)
extends Actor {
Expand Down Expand Up @@ -201,7 +201,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte]
private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
private val topic = s"${topicPrefix}creationAck${schedulerInstanceId.asString}"
private val maxActiveAcksPerPoll = 128
private val ackFeed = feedFactory(actorSystem, topic, maxActiveAcksPerPoll, processAcknowledgement)
private val ackFeed = feedFactory(actorSystem, "creationAck", topic, maxActiveAcksPerPoll, processAcknowledgement)

def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = {
Future(ContainerCreationAckMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
Expand All @@ -224,7 +224,7 @@ class CreationJobManager(feedFactory: (ActorRefFactory, String, Int, Array[Byte]
}

object CreationJobManager {
def props(feedFactory: (ActorRefFactory, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
def props(feedFactory: (ActorRefFactory, String, String, Int, Array[Byte] => Future[Unit]) => ActorRef,
schedulerInstanceId: SchedulerInstanceId,
dataManagementService: ActorRef)(implicit actorSystem: ActorSystem, logging: Logging) =
Props(new CreationJobManager(feedFactory, schedulerInstanceId, dataManagementService))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,19 @@ import org.apache.openwhisk.core.etcd.EtcdClient
import org.apache.openwhisk.core.etcd.EtcdKV.ContainerKeys.containerPrefix
import org.apache.openwhisk.core.etcd.EtcdKV.{ContainerKeys, QueueKeys, ThrottlingKeys}
import org.apache.openwhisk.core.scheduler.SchedulerEndpoints
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.scheduler.grpc.{GetActivation, ActivationResponse => GetActivationResponse}
import org.apache.openwhisk.core.scheduler.message.{ContainerCreation, ContainerDeletion, FailedCreationJob, SuccessfulCreationJob}
import org.apache.openwhisk.core.scheduler.message.{
ContainerCreation,
ContainerDeletion,
FailedCreationJob,
SuccessfulCreationJob
}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
Expand All @@ -48,7 +58,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future, Promise, duration}
import scala.concurrent.{duration, ExecutionContextExecutor, Future, Promise}
import scala.language.postfixOps
import scala.util.{Failure, Success}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class CreationJobManagerTests
}

def feedFactory(actorRefFactory: ActorRefFactory,
description: String,
topic: String,
maxActiveAcksPerPoll: Int,
handler: Array[Byte] => Future[Unit]): ActorRef = {
Expand Down