Skip to content

Commit

Permalink
[New Scheduler] Add memory queue for the new scheduler (#5110)
Browse files Browse the repository at this point in the history
* Add SchedulingDecisionMaker

* Add AverageRingBuffer to calculate the average execution time.

* Add MemoryQueue

* Remove the duplicate comment.

* Apply comments

* Explicitly export the scala version

* Explicitly export the scala version

* Use dotted expression.

* Revert the scala version env

* Add kryo dependency.

* Fix import issues.

* Fix import issues.

* Remove duplicated codes

* Update codes according to the new akka version.

* Apply review comments.

* Fix test case

* Change kryo serialization library

* Remove kryo

* Remove empty newline

* Add altoo kryo serialization library

* Change the kryo serializer implementation

* Fix test cases
  • Loading branch information
style95 authored Aug 30, 2021
1 parent 7e1caaa commit cf36299
Show file tree
Hide file tree
Showing 13 changed files with 5,896 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.common

object AverageRingBuffer {
def apply(maxSize: Int) = new AverageRingBuffer(maxSize)
}

/**
* This buffer provides the average of the given elements.
* The number of elements are limited and the first element is removed if the maximum size is reached.
* Since it is based on the Vector, its operation takes effectively constant time.
* For more details, please visit https://docs.scala-lang.org/overviews/collections/performance-characteristics.html
*
* @param maxSize the maximum size of the buffer
*/
class AverageRingBuffer(private val maxSize: Int) {
private var elements = Vector.empty[Double]
private var sum = 0.0
private var max = 0.0
private var min = 0.0

def nonEmpty: Boolean = elements.nonEmpty

def average: Double = {
val size = elements.size
if (size > 2) {
(sum - max - min) / (size - 2)
} else {
sum / size
}
}

def add(el: Double): Unit = synchronized {
if (elements.size == maxSize) {
sum = sum + el - elements.head
elements = elements.tail :+ el
} else {
sum += el
elements = elements :+ el
}
if (el > max) {
max = el
}
if (el < min) {
min = el
}
}

def size(): Int = elements.size
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,26 @@ object LoggingMarkers {
*
* MetricEmitter.emitCounterMetric(LoggingMarkers.MY_COUNTER(GreenCounter))
*/
def SCHEDULER_NAMESPACE_CONTAINER(namespace: String) =
LogMarkerToken(scheduler, "namespaceContainer", counter, Some(namespace), Map("namespace" -> namespace))(
MeasurementUnit.none)
def SCHEDULER_NAMESPACE_INPROGRESS_CONTAINER(namespace: String) =
LogMarkerToken(scheduler, "namespaceInProgressContainer", counter, Some(namespace), Map("namespace" -> namespace))(
MeasurementUnit.none)
def SCHEDULER_ACTION_CONTAINER(namespace: String, action: String) =
LogMarkerToken(
scheduler,
"actionContainer",
counter,
Some(namespace),
Map("namespace" -> namespace, "action" -> action))(MeasurementUnit.none)
def SCHEDULER_ACTION_INPROGRESS_CONTAINER(namespace: String, action: String) =
LogMarkerToken(
scheduler,
"actionInProgressContainer",
counter,
Some(namespace),
Map("namespace" -> namespace, "action" -> action))(MeasurementUnit.none)

/*
* Controller related markers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val schedulerMaxPeek = "whisk.scheduler.max-peek"
val schedulerQueue = "whisk.scheduler.queue"
val schedulerQueueManager = "whisk.scheduler.queue-manager"
val schedulerInProgressJobRetentionSecond = "whisk.scheduler.in-progress-job-retention"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,7 @@ protected[core] object CreationId {
}
}
}

val systemPrefix = "cid_"
val void = CreationId(systemPrefix + "void")
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ object Messages {
s"Too many requests in the last minute (count: $count, allowed: $allowed)."

/** Standard message for too many concurrent activation requests within a time window. */
val tooManyConcurrentRequests = s"Too many concurrent requests in flight."
def tooManyConcurrentRequests(count: Int, allowed: Int) =
s"Too many concurrent requests in flight (count: $count, allowed: $allowed)."

Expand Down Expand Up @@ -225,6 +226,7 @@ object Messages {
}

val namespacesBlacklisted = "The action was not invoked due to a blacklisted namespace."
val namespaceLimitUnderZero = "The namespace limit is less than or equal to 0."

val actionRemovedWhileInvoking = "Action could not be found or may have been deleted."
val actionMismatchWhileInvoking = "Action version is not compatible and cannot be invoked."
Expand Down
1 change: 1 addition & 0 deletions core/scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ dependencies {
}

compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile "io.altoo:akka-kryo-serialization_${gradle.scala.depVersion}:1.0.0"
compile project(':common:scala')
}

Expand Down
49 changes: 47 additions & 2 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,63 @@
# limitations under the License.
#


akka {
actor {
allow-java-serialization = off
serializers {
kryo = "io.altoo.akka.serialization.kryo.KryoSerializer"
}
serialization-bindings {
"org.apache.openwhisk.core.scheduler.queue.CreateQueue" = kryo
"org.apache.openwhisk.core.scheduler.queue.CreateQueueResponse" = kryo
"org.apache.openwhisk.core.connector.ActivationMessage" = kryo
}
kryo {
idstrategy = "automatic"
classes = [
"org.apache.openwhisk.core.scheduler.queue.CreateQueue",
"org.apache.openwhisk.core.scheduler.queue.CreateQueueResponse",
"org.apache.openwhisk.core.connector.ActivationMessage"
]
}
}

remote.netty.tcp {
send-buffer-size = 3151796b
receive-buffer-size = 3151796b
maximum-frame-size = 3151796b
}
}

whisk {
# tracing configuration
tracing {
component = "Scheduler"
}

fraction {
managed-fraction: 90%
blackbox-fraction: 10%
managed-fraction: 90%
blackbox-fraction: 10%
}

scheduler {
protocol = "http"
username: "scheduler.user"
password: "scheduler.pass"
grpc {
tls = "false"
}
queue {
idle-grace = "20 seconds"
stop-grace = "20 seconds"
flush-grace = "60 seconds"
graceful-shutdown-timeout = "5 seconds"
max-retention-size = "10000"
max-retention-ms = "60000"
throttling-fraction = "0.9"
duration-buffer-size = "10"
}
queue-manager {
max-scheduling-time = "20 seconds"
max-retries-to-get-queue = "13"
Expand Down
Loading

0 comments on commit cf36299

Please sign in to comment.