Skip to content

Commit

Permalink
[New Scheduler] Manage memory queues in scheduler (#5118)
Browse files Browse the repository at this point in the history
* Add QueueManager

* Add test configuration

* Remove deprecated functions

* Fix memory queue test

* Change string to FiniteDuration
  • Loading branch information
KeonHee authored Jun 14, 2021
1 parent dc7c666 commit 0cdfdb3
Show file tree
Hide file tree
Showing 10 changed files with 1,607 additions and 42 deletions.
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -465,5 +465,9 @@ etcd_connect_string: "{% set ret = [] %}\

scheduler:
protocol: "{{ scheduler_protocol | default('http') }}"
maxPeek: "{{ scheduler_max_peek | default(128) }}"
queueManager:
maxSchedulingTime: "{{ scheduler_maxSchedulingTime | default('20 second') }}"
maxRetriesToGetQueue: "{{ scheduler_maxRetriesToGetQueue | default(13) }}"
dataManagementService:
retryInterval: "{{ scheduler_dataManagementService_retryInterval | default('1 second') }}"
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,17 @@ object LoggingMarkers {

// Time that is needed to produce message in kafka
val SCHEDULER_KAFKA = LogMarkerToken(scheduler, kafka, start)(MeasurementUnit.time.milliseconds)
val SCHEDULER_WAIT_TIME =
LogMarkerToken(scheduler, "waitTime", counter)(MeasurementUnit.none)

def SCHEDULER_KEEP_ALIVE(leaseId: Long) =
LogMarkerToken(scheduler, "keepAlive", counter, None, Map("leaseId" -> leaseId.toString))(MeasurementUnit.none)
def SCHEDULER_QUEUE = LogMarkerToken(scheduler, "queue", counter)(MeasurementUnit.none)
def SCHEDULER_QUEUE_CREATE = LogMarkerToken(scheduler, "queueCreate", start)(MeasurementUnit.time.milliseconds)
def SCHEDULER_QUEUE_UPDATE(reason: String) =
LogMarkerToken(scheduler, "queueUpdate", counter, None, Map("reason" -> reason))(MeasurementUnit.none)
def SCHEDULER_QUEUE_WAITING_ACTIVATION(action: String) =
LogMarkerToken(scheduler, "queueActivation", counter, Some(action), Map("action" -> action))(MeasurementUnit.none)
/*
* General markers
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"

val whiskClusterName = "whisk.cluster.name"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class ActivationMessage(override val transid: TransactionId,
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
traceContext: Option[Map[String, String]] = None)
extends Message {
extends Message {

override def serialize = ActivationMessage.serdes.write(this).compactPrint

Expand Down Expand Up @@ -116,11 +116,11 @@ abstract class AcknowledegmentMessage(private val tid: TransactionId) extends Me
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class CombinedCompletionAndResultMessage private(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CombinedCompletionAndResultMessage private (override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "combined"

override def result = Some(response)
Expand All @@ -142,11 +142,11 @@ case class CombinedCompletionAndResultMessage private(override val transid: Tran
* phase notification to the load balancer where an invoker first sends a `ResultMessage` and later sends the
* `CompletionMessage`.
*/
case class CompletionMessage private(override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
case class CompletionMessage private (override val transid: TransactionId,
override val activationId: ActivationId,
override val isSystemError: Option[Boolean],
instance: InstanceId)
extends AcknowledegmentMessage(transid) {
override def messageType = "completion"

override def result = None
Expand All @@ -168,8 +168,8 @@ case class CompletionMessage private(override val transid: TransactionId,
* The constructor is private so that callers must use the more restrictive constructors which ensure the respose is always
* Right when this message is created.
*/
case class ResultMessage private(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
case class ResultMessage private (override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage(transid) {
override def messageType = "result"

override def result = Some(response)
Expand Down Expand Up @@ -253,7 +253,7 @@ object AcknowledegmentMessage extends DefaultJsonProtocol {
Left(value.convertTo[ActivationId])

case _: JsObject => Right(value.convertTo[WhiskActivation])
case _ => deserializationError("could not read ResultMessage")
case _ => deserializationError("could not read ResultMessage")
}
}

Expand Down Expand Up @@ -296,7 +296,7 @@ object EventMessageBody extends DefaultJsonProtocol {

implicit val format = new JsonFormat[EventMessageBody] {
def write(eventMessageBody: EventMessageBody) = eventMessageBody match {
case m: Metric => m.toJson
case m: Metric => m.toJson
case a: Activation => a.toJson
}

Expand All @@ -321,7 +321,7 @@ case class Activation(name: String,
causedBy: Option[String],
size: Option[Int] = None,
userDefinedStatusCode: Option[Int] = None)
extends EventMessageBody {
extends EventMessageBody {
val typeName = Activation.typeName

override def serialize = toJson.compactPrint
Expand Down Expand Up @@ -349,12 +349,12 @@ object Activation extends DefaultJsonProtocol {
private implicit val durationFormat = new RootJsonFormat[Duration] {
override def write(obj: Duration): JsValue = obj match {
case o if o.isFinite => JsNumber(o.toMillis)
case _ => JsNumber.zero
case _ => JsNumber.zero
}

override def read(json: JsValue): Duration = json match {
case JsNumber(n) if n <= 0 => Duration.Zero
case JsNumber(n) => toDuration(n.longValue)
case JsNumber(n) => toDuration(n.longValue)
}
}

Expand Down Expand Up @@ -437,7 +437,7 @@ case class EventMessage(source: String,
userId: UUID,
eventType: String,
timestamp: Long = System.currentTimeMillis())
extends Message {
extends Message {
override def serialize = EventMessage.format.write(this).compactPrint
}

Expand All @@ -460,7 +460,7 @@ case class InvokerResourceMessage(status: String,
inProgressMemory: Long,
tags: Seq[String],
dedicatedNamespaces: Seq[String])
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand Down Expand Up @@ -502,7 +502,7 @@ object InvokerResourceMessage extends DefaultJsonProtocol {
object StatusQuery

case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
extends Message {
extends Message {

override def serialize: String = StatusData.serdes.write(this).compactPrint

Expand All @@ -524,7 +524,7 @@ case class ContainerCreationMessage(override val transid: TransactionId,
rpcPort: Int,
retryCount: Int = 0,
creationId: CreationId = CreationId.generate())
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {

override def toJson: JsValue = ContainerCreationMessage.serdes.write(this)

Expand Down Expand Up @@ -556,7 +556,7 @@ case class ContainerDeletionMessage(override val transid: TransactionId,
action: FullyQualifiedEntityName,
revision: DocRevision,
whiskActionMetaData: WhiskActionMetaData)
extends ContainerMessage(transid) {
extends ContainerMessage(transid) {
override def toJson: JsValue = ContainerDeletionMessage.serdes.write(this)

override def serialize: String = toJson.compactPrint
Expand Down Expand Up @@ -640,17 +640,17 @@ object ContainerCreationError extends Enumeration {
ZeroNamespaceLimit)

private def parse(name: String) = name.toUpperCase match {
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLEINVOKERSERROR" => NoAvailableInvokersError
case "NOAVAILABLERESOURCEINVOKERSERROR" => NoAvailableResourceInvokersError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
case "RESOURCENOTENOUGHERROR" => ResourceNotEnoughError
case "NONEXECUTBLEACTIONERROR" => NonExecutableActionError
case "DBFETCHERROR" => DBFetchError
case "WHISKERROR" => WhiskError
case "BLACKBOXERROR" => BlackBoxError
case "TIMEOUTERROR" => TimeoutError
case "ZERONAMESPACELIMIT" => ZeroNamespaceLimit
case "TOOMANYCONCURRENTREQUESTS" => TooManyConcurrentRequests
case "UNKNOWNERROR" => UnknownError
}

implicit val serds = new RootJsonFormat[ContainerCreationError] {
Expand Down Expand Up @@ -678,7 +678,7 @@ case class ContainerCreationAckMessage(override val transid: TransactionId,
retryCount: Int = 0,
error: Option[ContainerCreationError] = None,
reason: Option[String] = None)
extends Message {
extends Message {

/**
* Serializes message to string. Must be idempotent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize]

override def equals(that: Any): Boolean = that match {
case t: ByteSize => compareTo(t) == 0
case _ => false
case _ => false
}

override def toString = {
unit match {
case SizeUnits.BYTE => s"$size B"
case SizeUnits.KB => s"$size KB"
case SizeUnits.MB => s"$size MB"
case SizeUnits.GB => s"$size GB"
case SizeUnits.KB => s"$size KB"
case SizeUnits.MB => s"$size MB"
case SizeUnits.GB => s"$size GB"
}
}
}
Expand Down Expand Up @@ -190,7 +190,7 @@ object size {

def read(value: JsValue): ByteSize = value match {
case JsString(s) => ByteSize.fromString(s)
case _ => deserializationError(formatError)
case _ => deserializationError(formatError)
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ whisk {
}

scheduler {
queue-manager {
max-scheduling-time = "20 seconds"
max-retries-to-get-queue = "13"
}
max-peek = "128"
in-progress-job-retention = "20 seconds"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.scheduler.queue

import org.apache.openwhisk.core.connector.ActivationMessage
import org.apache.openwhisk.core.entity.{DocInfo, FullyQualifiedEntityName}

import scala.concurrent.Promise

// Events sent by the actor
case class QueueRemoved(invocationNamespace: String, action: DocInfo, leaderKey: Option[String])
case class QueueReactivated(invocationNamespace: String, action: FullyQualifiedEntityName, docInfo: DocInfo)
case class CancelPoll(promise: Promise[Either[MemoryQueueError, ActivationMessage]])
case object QueueRemovedCompleted
case object FlushPulse

// Events received by the actor
case object Start
case object VersionUpdated
case object StopSchedulingAsOutdated
Loading

0 comments on commit 0cdfdb3

Please sign in to comment.