Skip to content

Commit

Permalink
move actor to busypool ONLY when max concurrency reached
Browse files Browse the repository at this point in the history
  • Loading branch information
tysonnorris committed Sep 22, 2017
1 parent 8166f60 commit 7ec97e3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 117 deletions.
133 changes: 29 additions & 104 deletions core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.Props
import whisk.common.AkkaLogging
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.entity.ByteSize
import whisk.core.entity.CodeExec
import whisk.core.entity.EntityName
import whisk.core.entity.ExecutableWhiskAction
import whisk.core.entity.size._
import whisk.core.connector.MessageFeed
import whisk.core.containerpool.ContainerPool.ActivationCounter

sealed trait WorkerState
case object Busy extends WorkerState
Expand Down Expand Up @@ -58,14 +58,13 @@ case class WorkerData(data: ContainerData, state: WorkerState)
* @param maxPoolSize maximum size of containers allowed in the pool
* @param feed actor to request more work from
* @param prewarmConfig optional settings for container prewarming
* @param scheduler optional scheduler to define how activations are scheduled to busy/free container pools
*/
class ContainerPool(childFactory: ActorRefFactory => ActorRef,
maxActiveContainers: Int,
maxPoolSize: Int,
feed: ActorRef,
prewarmConfig: Option[PrewarmingConfig] = None,
scheduler: ContainerPoolScheduler = ContainerPool)
maxConcurrent: Int = 1)
extends Actor {
implicit val logging = new AkkaLogging(context.system.log)

Expand All @@ -86,9 +85,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
implicit val tid = r.msg.transid
val container = if (busyPool.size < maxActiveContainers) {
// Schedule a job to a warm container
scheduler
.schedule(r.action, r.msg.user.namespace, freePool.toMap, busyPool.toMap)
ContainerPool
.schedule(r.action, r.msg.user.namespace, freePool.toMap)
.orElse {
logging.info(this, "no free found")
if (busyPool.size + freePool.size < maxPoolSize) {
takePrewarmContainer(r.action).orElse {
Some(createContainer())
Expand All @@ -97,7 +97,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
.orElse {
// Remove a container and create a new one for the given job
scheduler.remove(r.action, r.msg.user.namespace, freePool.toMap).map { toDelete =>
ContainerPool.remove(r.action, r.msg.user.namespace, freePool.toMap).map { toDelete =>
removeContainer(toDelete)
takePrewarmContainer(r.action).getOrElse {
createContainer()
Expand All @@ -108,51 +108,32 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

container match {
case Some((actor, data)) =>
if (freePool.contains(actor)) {
//previously free, but now busy (prewarm or warm)
//only move to busyPool if max reached
ActivationCounter.inc(actor)
if (ActivationCounter.getOrElse(actor, 0) >= maxConcurrent) {
busyPool.update(actor, data)
freePool.remove(actor)
} else if (!busyPool.contains(actor)) {
logging.warn(this, s"updating busy for missing ${data}")
//is not in free, but is also not in busy? should not be possible
busyPool.update(actor, data)
} else {
busyPool
.get(actor)
.foreach({
case _: PreWarmedData =>
//in case previous state was busy+prewarm, update to busy+warm
busyPool.update(actor, data)
case _ =>
//no updates are needed
})
}

scheduler.begin(actor, data)
actor ! r // forwards the run request to the container
case None =>
logging.error(this, s"Rescheduling Run message ${r.action}, too many message in the pool")(r.msg.transid)
logging.error(this, "Rescheduling Run message, too many message in the pool")(r.msg.transid)
self ! r
}

// Container is free to take more work
case NeedWork(data: WarmedData) =>
//if current in-flight activations drops to 0, move this container from busy->free
if (scheduler.complete(sender(), data)) {
feed ! MessageFeed.Processed
if (ActivationCounter.dec(sender()) < maxConcurrent) {
//move back to freePool if capacity available
freePool.update(sender(), data)
busyPool
.remove(sender())
.foreach(_ => {
feed ! MessageFeed.Processed
})
busyPool.remove(sender())
} else {
//if we still have in-flight activations, only update busy if previous state was prewarm
busyPool.get(sender()).foreach {
//update freePool IFF it was previously PreWarmedData
freePool.get(sender()).foreach {
case p: PreWarmedData =>
busyPool.update(sender(), data)
freePool.update(sender(), data)
case _ =>
}
feed ! MessageFeed.Processed
}

// Container is prewarmed and ready to take work
Expand Down Expand Up @@ -214,7 +195,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
}

object ContainerPool extends ContainerPoolScheduler {
object ContainerPool {

/**
* Finds the best container for a given job to run on.
Expand All @@ -231,19 +212,13 @@ object ContainerPool extends ContainerPoolScheduler {
* @param idles a map of idle containers, awaiting work
* @return a container if one found
*/
def schedule[A](action: ExecutableWhiskAction,
invocationNamespace: EntityName,
idles: Map[A, ContainerData],
busy: Map[A, ContainerData])(implicit transid: TransactionId): Option[(A, ContainerData)] = {
def schedule[A](action: ExecutableWhiskAction, invocationNamespace: EntityName, idles: Map[A, ContainerData])(
implicit transid: TransactionId): Option[(A, ContainerData)] = {
idles.find {
case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true
case _ => false
}
}
//no concurrency supported, so every activation releases the container back to free pool
def complete[A](containerProxy: A, data: ContainerData): Boolean = true
//no concurrency supported, so no tracking is done on each activation
def begin[A](containerProxy: A, data: ContainerData)(implicit transid: TransactionId): Unit = Unit

/**
* Finds the best container to remove to make space for the job passed to run.
Expand All @@ -257,9 +232,11 @@ object ContainerPool extends ContainerPoolScheduler {
*/
def remove[A](action: ExecutableWhiskAction, invocationNamespace: EntityName, pool: Map[A, ContainerData])(
implicit transid: TransactionId): Option[A] = {
// Try to find a Free container that is initialized with any OTHER action
// Try to find a Free container that does NOT have any active activations AND is initialized with any OTHER action
val freeContainers = pool.collect {
case (ref, w: WarmedData) if (w.action != action || w.invocationNamespace != invocationNamespace) => ref -> w
case (ref, w: WarmedData)
if ActivationCounter.getOrElse(ref, 0) == 0 && (w.action != action || w.invocationNamespace != invocationNamespace) =>
ref -> w
}

if (freeContainers.nonEmpty) {
Expand All @@ -273,26 +250,9 @@ object ContainerPool extends ContainerPoolScheduler {
size: Int,
feed: ActorRef,
prewarmConfig: Option[PrewarmingConfig] = None,
scheduler: ContainerPoolScheduler = ContainerPool) =
Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig, scheduler))
}

/** Defines how activations are scheduled into the pool of busy and free containers */
trait ContainerPoolScheduler {
def schedule[A](action: ExecutableWhiskAction,
invocationNamespace: EntityName,
idles: Map[A, ContainerData],
busy: Map[A, ContainerData])(implicit transid: TransactionId): Option[(A, ContainerData)]
def begin[A](containerProxy: A, data: ContainerData)(implicit transid: TransactionId): Unit
def complete[A](containerProxy: A, data: ContainerData): Boolean
def remove[A](action: ExecutableWhiskAction, invocationNamespace: EntityName, pool: Map[A, ContainerData])(
implicit transid: TransactionId): Option[A]
}
maxConcurrent: Int = 1) =
Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig, maxConcurrent))

/** Contains settings needed to perform container prewarming */
case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: ByteSize)

class ConcurrentPoolScheduler(val maxConcurrent: Int = 200)(implicit logging: Logging) extends ContainerPoolScheduler {
private object ActivationCounter extends mutable.HashMap[Any, Int] {
def inc(a: Any): Int = {
val newVal = get(a).getOrElse(0) + 1
Expand All @@ -305,42 +265,7 @@ class ConcurrentPoolScheduler(val maxConcurrent: Int = 200)(implicit logging: Lo
newVal
}
}
override def schedule[A](action: ExecutableWhiskAction,
invocationNamespace: EntityName,
idles: Map[A, ContainerData],
busy: Map[A, ContainerData])(implicit transid: TransactionId): Option[(A, ContainerData)] = {
//prefer to use busy+warm containers over free+warm containers
busy
.find {
case (a, WarmedData(_, `invocationNamespace`, `action`, _))
if ActivationCounter.getOrElse(a, 0) < maxConcurrent =>
true
case _ => false
}
.orElse {
logging.info(
this,
s"no busy container available for ${action.fullyQualifiedName(false)} will try a free container")
idles.find {
case (_, WarmedData(_, `invocationNamespace`, `action`, _)) => true
case _ => false
}
}
}

override def begin[A](containerProxy: A, data: ContainerData)(implicit transid: TransactionId): Unit = {
val newCount = ActivationCounter.inc(containerProxy)
logging.info(this, s"begin activation counter for ${containerProxy} is now: ${newCount}")

}

override def complete[A](containerProxy: A, data: ContainerData): Boolean = {
val newCount = ActivationCounter.dec(containerProxy)
val result = newCount == 0
logging.info(this, s"complete activation counter for ${containerProxy} is now: ${newCount} ${result}")
result
}
override def remove[A](action: ExecutableWhiskAction, invocationNamespace: EntityName, pool: Map[A, ContainerData])(
implicit transid: TransactionId): Option[A] =
ContainerPool.remove(action, invocationNamespace, pool)
}

/** Contains settings needed to perform container prewarming */
case class PrewarmingConfig(count: Int, exec: CodeExec[_], memoryLimit: ByteSize)
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import whisk.core.connector.CompletionMessage
import whisk.core.connector.MessageFeed
import whisk.core.connector.MessageProducer
import whisk.core.connector.MessagingProvider
import whisk.core.containerpool.ConcurrentPoolScheduler
//import whisk.core.containerpool.ConcurrentPoolScheduler
import whisk.core.containerpool.ContainerPool
import whisk.core.containerpool.ContainerProxy
import whisk.core.containerpool.PrewarmingConfig
Expand Down Expand Up @@ -78,6 +76,9 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa

//number of messages the processing can handle in one batch (for ALL actions)
val maximumHandlerCapacity = 200
//number of concurrent activations per action container
val maxConcurrent = 200

val activationFeed = actorSystem.actorOf(Props {
new MessageFeed("activation", logging, consumer, maximumHandlerCapacity, 100.milliseconds, processActivationMessage)
})
Expand Down Expand Up @@ -175,16 +176,14 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa
}
.get

//use the ConcurrentPoolScheduler set to handle 200 activations concurrently per action
val scheduler = new ConcurrentPoolScheduler(200)
val pool = actorSystem.actorOf(
ContainerPool.props(
childFactory,
maximumContainers,
maximumContainers,
activationFeed,
Some(PrewarmingConfig(2, prewarmExec, 256.MB)),
scheduler))
maxConcurrent))

/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,30 +315,30 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
behavior of "ContainerPool schedule()"

it should "not provide a container if idle pool is empty" in {
ContainerPool.schedule(createAction(), standardNamespace, Map(), Map()) shouldBe None
ContainerPool.schedule(createAction(), standardNamespace, Map()) shouldBe None
}

it should "reuse an applicable warm container from idle pool with one container" in {
val data = warmedData()
val pool = Map('name -> data)

// copy to make sure, referencial equality doesn't suffice
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool, Map()) shouldBe Some('name, data)
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe Some('name, data)
}

it should "reuse an applicable warm container from idle pool with several applicable containers" in {
val data = warmedData()
val pool = Map('first -> data, 'second -> data)

ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool, Map()) should (be(Some('first, data)) or be(
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) should (be(Some('first, data)) or be(
Some('second, data)))
}

it should "reuse an applicable warm container from idle pool with several different containers" in {
val matchingData = warmedData()
val pool = Map('none -> noData(), 'pre -> preWarmedData(), 'warm -> matchingData)

ContainerPool.schedule(matchingData.action.copy(), matchingData.invocationNamespace, pool, Map()) shouldBe Some(
ContainerPool.schedule(matchingData.action.copy(), matchingData.invocationNamespace, pool) shouldBe Some(
'warm,
matchingData)
}
Expand All @@ -348,7 +348,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
// data is **not** in the pool!
val pool = Map('none -> noData(), 'pre -> preWarmedData())

ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool, Map()) shouldBe None
ContainerPool.schedule(data.action.copy(), data.invocationNamespace, pool) shouldBe None
}

it should "not reuse a warm container with different invocation namespace" in {
Expand All @@ -357,7 +357,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
val differentNamespace = EntityName(data.invocationNamespace.asString + "butDifferent")

data.invocationNamespace should not be differentNamespace
ContainerPool.schedule(data.action.copy(), differentNamespace, pool, Map()) shouldBe None
ContainerPool.schedule(data.action.copy(), differentNamespace, pool) shouldBe None
}

it should "not reuse a warm container with different action name" in {
Expand All @@ -366,7 +366,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
val pool = Map('warm -> data)

data.action.name should not be differentAction.name
ContainerPool.schedule(differentAction, data.invocationNamespace, pool, Map()) shouldBe None
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
}

it should "not reuse a warm container with different action version" in {
Expand All @@ -375,7 +375,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
val pool = Map('warm -> data)

data.action.version should not be differentAction.version
ContainerPool.schedule(differentAction, data.invocationNamespace, pool, Map()) shouldBe None
ContainerPool.schedule(differentAction, data.invocationNamespace, pool) shouldBe None
}

behavior of "ContainerPool remove()"
Expand Down

0 comments on commit 7ec97e3

Please sign in to comment.