Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[New Scheduler] Implement FunctionPullingContainerPool #5102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@
"CONFIG_whisk_invoker_https_keystorePassword": "{{ invoker.ssl.keystore.password }}"
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInitDelay": "{{ container_pool_prewarm_expirationCheckInitDelay | default('10 minutes') }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('10 minutes') }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ object LoggingMarkers {

// Time that is needed to produce message in kafka
val CONTROLLER_KAFKA = LogMarkerToken(controller, kafka, start)(MeasurementUnit.time.milliseconds)
def INVOKER_SHAREDPACKAGE(path: String) =
LogMarkerToken(invoker, "sharedPackage", counter, None, Map("path" -> path))(MeasurementUnit.none)
def INVOKER_CONTAINERPOOL_MEMORY(state: String) =
LogMarkerToken(invoker, "containerPoolMemory", counter, Some(state), Map("state" -> state))(MeasurementUnit.none)

// System overload and random invoker assignment
val MANAGED_SYSTEM_OVERLOAD =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,165 @@ object StatusData extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}

case class ContainerCreationMessage(override val transid: TransactionId,
invocationNamespace: String,
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData,
rootSchedulerIndex: SchedulerInstanceId,
schedulerHost: String,
rpcPort: Int,
retryCount: Int = 0,
creationId: CreationId = CreationId.generate())
extends ContainerMessage(transid) {

override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)
override def serialize: String = toJson.compactPrint
}

object ContainerCreationMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[ContainerCreationMessage] = Try(serdes.read(msg.parseJson))

private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
private implicit val byteSizeSerdes = size.serdes
implicit val serdes = jsonFormat10(
ContainerCreationMessage.apply(
_: TransactionId,
_: String,
_: FullyQualifiedEntityName,
_: DocRevision,
_: WhiskActionMetaData,
_: SchedulerInstanceId,
_: String,
_: Int,
_: Int,
_: CreationId))
}

case class ContainerDeletionMessage(override val transid: TransactionId,
invocationNamespace: String,
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)
override def serialize: String = toJson.compactPrint
}

object ContainerDeletionMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[ContainerDeletionMessage] = Try(serdes.read(msg.parseJson))

private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
private implicit val instanceIdSerdes = SchedulerInstanceId.serdes
private implicit val byteSizeSerdes = size.serdes
implicit val serdes = jsonFormat5(
ContainerDeletionMessage
.apply(_: TransactionId, _: String, _: FullyQualifiedEntityName, _: DocRevision, _: WhiskActionMetaData))
}

abstract class ContainerMessage(private val tid: TransactionId) extends Message {
override val transid: TransactionId = tid
override def serialize: String = ContainerMessage.serdes.write(this).compactPrint

/** Serializes the message to JSON. */
def toJson: JsValue
}

object ContainerMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[ContainerMessage] = Try(serdes.read(msg.parseJson))

implicit val serdes = new RootJsonFormat[ContainerMessage] {
override def write(m: ContainerMessage): JsValue = m.toJson

override def read(json: JsValue): ContainerMessage = {
val JsObject(fields) = json
val creation = fields.contains("creationId")
if (creation) {
json.convertTo[ContainerCreationMessage]
} else {
json.convertTo[ContainerDeletionMessage]
}
}
}
}

sealed trait ContainerCreationError
object ContainerCreationError extends Enumeration {
case object NoAvailableInvokersError extends ContainerCreationError
case object NoAvailableResourceInvokersError extends ContainerCreationError
case object ResourceNotEnoughError extends ContainerCreationError
case object WhiskError extends ContainerCreationError
case object UnknownError extends ContainerCreationError
case object TimeoutError extends ContainerCreationError
case object ShuttingDownError extends ContainerCreationError
case object NonExecutableActionError extends ContainerCreationError
case object DBFetchError extends ContainerCreationError
case object BlackBoxError extends ContainerCreationError
case object ZeroNamespaceLimit extends ContainerCreationError
case object TooManyConcurrentRequests extends ContainerCreationError

val whiskErrors: Set[ContainerCreationError] =
Set(
NoAvailableInvokersError,
NoAvailableResourceInvokersError,
ResourceNotEnoughError,
WhiskError,
ShuttingDownError,
UnknownError,
TimeoutError,
ZeroNamespaceLimit)

def fromName(name: String) = name.toUpperCase match {
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
}

implicit val serds = new RootJsonFormat[ContainerCreationError] {
override def write(error: ContainerCreationError): JsValue = JsString(error.toString)
override def read(json: JsValue): ContainerCreationError =
Try {
val JsString(str) = json
ContainerCreationError.fromName(str.trim.toUpperCase)
} getOrElse {
throw deserializationError("ContainerCreationError must be a valid string")
}
}
}

case class ContainerCreationAckMessage(override val transid: TransactionId,
creationId: CreationId,
invocationNamespace: String,
action: FullyQualifiedEntityName,
revision: DocRevision,
actionMetaData: WhiskActionMetaData,
rootInvokerIndex: InvokerInstanceId,
schedulerHost: String,
rpcPort: Int,
retryCount: Int = 0,
error: Option[ContainerCreationError] = None,
reason: Option[String] = None)
extends Message {

/**
* Serializes message to string. Must be idempotent.
*/
override def serialize: String = ContainerCreationAckMessage.serdes.write(this).compactPrint
}

object ContainerCreationAckMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[ContainerCreationAckMessage] = Try(serdes.read(msg.parseJson))
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
private implicit val byteSizeSerdes = size.serdes
implicit val serdes = jsonFormat12(ContainerCreationAckMessage.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ case class ContainerArgsConfig(network: String,
case class ContainerPoolConfig(userMemory: ByteSize,
concurrentPeekFactor: Double,
akkaClient: Boolean,
prewarmExpirationCheckInitDelay: FiniteDuration,
prewarmExpirationCheckInterval: FiniteDuration,
prewarmExpirationCheckIntervalVariance: Option[FiniteDuration],
prewarmExpirationLimit: Int) {
prewarmExpirationLimit: Int,
prewarmMaxRetryLimit: Int,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")
Expand All @@ -68,6 +73,11 @@ case class ContainerPoolConfig(userMemory: ByteSize,
max((totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt, 2) // The minimum allowed cpu-shares is 2
}

case class PrewarmContainerCreationConfig(maxConcurrent: Int, creationDelay: FiniteDuration) {
require(maxConcurrent > 0, "maxConcurrent for per invoker must be > 0")
require(creationDelay.toSeconds > 0, "creationDelay must be > 0")
}

case class RuntimesRegistryCredentials(user: String, password: String)

case class RuntimesRegistryConfig(url: String, credentials: Option[RuntimesRegistryCredentials])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package org.apache.openwhisk.core.entity

import scala.util.Try
import org.apache.commons.lang3.StringUtils

import scala.util.Try
import spray.json.DefaultJsonProtocol
import spray.json.JsNull
import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
import spray.json._

import org.apache.openwhisk.core.entity.ArgNormalizer.trim

/**
Expand Down Expand Up @@ -56,11 +56,27 @@ protected[core] class DocId(val id: String) extends AnyVal {
*
* @param rev the document revision, optional
*/
protected[core] class DocRevision private (val rev: String) extends AnyVal {
protected[core] class DocRevision private (val rev: String) extends AnyVal with Ordered[DocRevision] {
def asString = rev // to make explicit that this is a string conversion
def empty = rev == null
override def toString = rev
def serialize = DocRevision.serdes.write(this).compactPrint

override def compare(that: DocRevision): Int = {
if (this.empty && that.empty) {
0
} else if (this.empty) {
-1
} else if (that.empty) {
1
} else {
StringUtils.substringBefore(rev, "-").toInt - StringUtils.substringBefore(that.rev, "-").toInt
}
}

def ==(that: DocRevision): Boolean = {
this.compare(that) == 0
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object size {
implicit val pureconfigReader =
ConfigReader[ConfigValue].map(v => ByteSize(v.atKey("key").getBytes("key"), SizeUnits.BYTE))

protected[entity] implicit val serdes = new RootJsonFormat[ByteSize] {
implicit val serdes = new RootJsonFormat[ByteSize] {
def write(b: ByteSize) = JsString(b.toString)

def read(value: JsValue): ByteSize = value match {
Expand Down
6 changes: 5 additions & 1 deletion core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ whisk {
user-memory: 1024 m
concurrent-peek-factor: 0.5 #factor used to limit message peeking: 0 < factor <= 1.0 - larger number improves concurrent processing, but increases risk of message loss during invoker crash
akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient)
prewarm-expiration-check-interval: 1 minute # period to check for prewarm expiration
prewarm-expiration-check-init-delay: 10 minute # the init delay time for the first check
prewarm-expiration-check-interval: 10 minute # period to check for prewarm expiration
prewarm-expiration-check-interval-variance: 10 seconds # varies expiration across invokers to avoid many concurrent expirations
prewarm-expiration-limit: 100 # number of prewarms to expire in one expiration cycle (remaining expired will be considered for expiration in next cycle)
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these following two configurations used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but for prewarm-promotion, is used in FunctionPullingContainerPool only, and the configuration is false which mean, didn't take prewarm container which has bigger memory

memory-sync-interval: 1 second # period to sync memory info to etcd
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
.nextInt(v.toSeconds.toInt))
.getOrElse(0)
.seconds
context.system.scheduler.schedule(2.seconds, interval, self, AdjustPrewarmedContainer)
if (prewarmConfig.exists(!_.reactive.isEmpty)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have no need to backfill the prewarm periodically if reactive configuration is not included in runtimes.json, because if doesn't exist reactive in runtimes.json, the prewarm container will not be expired forever.

fyi, the reactive configuration in runtimes.json like below

...
                "stemCells": [
                    {
                        "initialCount": 2,
                        "memory": "256 MB",
                        "reactive": {
                            "minCount": 1,
                            "maxCount": 4,
                            "ttl": "2 minutes",
                            "threshold": 1,
                            "increment": 1
                        }
                    }
                ]
...

context.system.scheduler.schedule(
poolConfig.prewarmExpirationCheckInitDelay,
interval,
self,
AdjustPrewarmedContainer)
}

def logContainerStart(r: Run, containerState: String, activeActivations: Int, container: Option[Container]): Unit = {
val namespaceName = r.msg.user.namespace.name.asString
Expand Down Expand Up @@ -590,9 +596,9 @@ object ContainerPool {
}
.sortBy(_._2.expires.getOrElse(now))

// emit expired container counter metric with memory + kind
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
if (expiredPrewarmedContainer.nonEmpty) {
// emit expired container counter metric with memory + kind
MetricEmitter.emitCounterMetric(LoggingMarkers.CONTAINER_POOL_PREWARM_EXPIRED(memory.toString, kind))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If expiredPrewarmedContainer is empty, have no need to emit this counter metric.

logging.info(
this,
s"[kind: ${kind} memory: ${memory.toString}] ${expiredPrewarmedContainer.size} expired prewarmed containers")
Expand Down
Loading