Skip to content

Commit

Permalink
[New Scheduler] Initial commit for the scheduler component (#4983)
Browse files Browse the repository at this point in the history
* Initial commit for the scheduler component

* Add a license header

* Apply comments.

* Move configuration checkups to above.

* Add supplementary comments
  • Loading branch information
style95 authored Nov 11, 2020
1 parent cb16450 commit 7b99af9
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ object TransactionId {

val systemPrefix = "sid_"

var containerCreation = TransactionId(systemPrefix + "containerCreation")
val unknown = TransactionId(systemPrefix + "unknown")
val testing = TransactionId(systemPrefix + "testing") // Common id for for unit testing
val invoker = TransactionId(systemPrefix + "invoker") // Invoker startup/shutdown or GC activity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ class WhiskConfig(requiredProperties: Map[String, String],
val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)

val schedulerHost = this(WhiskConfig.schedulerHost)
val schedulerRpcPort = this(WhiskConfig.schedulerRpcPort)
val schedulerAkkaPort = this(WhiskConfig.schedulerAkkaPort)
}

object WhiskConfig {
Expand Down Expand Up @@ -190,6 +194,10 @@ object WhiskConfig {
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
val controllerSeedNodes = "akka.cluster.seed.nodes"

val schedulerHost = "whisk.scheduler.endpoints.host"
val schedulerRpcPort = "whisk.scheduler.endpoints.rpcPort"
val schedulerAkkaPort = "whisk.scheduler.endpoints.akkaPort"
}

object ConfigKeys {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,33 @@ object EventMessage extends DefaultJsonProtocol {

def parse(msg: String) = Try(format.read(msg.parseJson))
}

/**
* This case class is used when retrieving the snapshot of the queue status from the scheduler at a certain moment.
* This is useful to figure out the internal status when any issue happens.
* The following would be an example result.
*
* [
* ...
* {
* "data": "RunningData",
* "fqn": "whisk.system/elasticsearch/status-alarm@0.0.2",
* "invocationNamespace": "style95",
* "status": "Running",
* "waitingActivation": 1
* },
* ...
* ]
*/
object StatusQuery
case class StatusData(invocationNamespace: String, fqn: String, waitingActivation: Int, status: String, data: String)
extends Message {

override def serialize: String = StatusData.serdes.write(this).compactPrint

}
object StatusData extends DefaultJsonProtocol {

implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ case class ControllerInstanceId(asString: String) extends InstanceId {
override val toJson: JsValue = ControllerInstanceId.serdes.write(this)
}

case class SchedulerInstanceId(val asString: String) extends InstanceId {
validate(asString)
override val instanceType = "scheduler"

override val source = s"$instanceType$asString"

override val toString: String = source

override val toJson: JsValue = SchedulerInstanceId.serdes.write(this)
}

object InvokerInstanceId extends DefaultJsonProtocol {
def parse(c: String): Try[InvokerInstanceId] = Try(serdes.read(c.parseJson))

Expand Down Expand Up @@ -112,6 +123,10 @@ object ControllerInstanceId extends DefaultJsonProtocol {
}
}

object SchedulerInstanceId extends DefaultJsonProtocol {
implicit val serdes = jsonFormat(SchedulerInstanceId.apply _, "asString")
}

trait InstanceId {

// controller ids become part of a kafka topic, hence, hence allow only certain characters
Expand Down
33 changes: 33 additions & 0 deletions core/scheduler/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

FROM scala

ENV UID=1001 \
NOT_ROOT_USER=owuser

# Copy app jars
ADD build/distributions/scheduler.tar /

COPY init.sh /
RUN chmod +x init.sh

RUN adduser -D -u ${UID} -h /home/${NOT_ROOT_USER} -s /bin/bash ${NOT_ROOT_USER}
USER ${NOT_ROOT_USER}

EXPOSE 8080
CMD ["./init.sh", "0"]
58 changes: 58 additions & 0 deletions core/scheduler/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'scala'
apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'maven'
apply plugin: 'org.scoverage'

ext.dockerImageName = 'scheduler'
apply from: '../../gradle/docker.gradle'
distDocker.dependsOn ':common:scala:distDocker', 'distTar'

project.archivesBaseName = "openwhisk-scheduler"

ext.coverageDirs = [
"${buildDir}/classes/scala/scoverage",
"${project(':common:scala').buildDir.absolutePath}/classes/scala/scoverage"
]
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'

// Define a separate configuration for managing the dependency on Jetty ALPN agent.
configurations {
alpnagent
}

dependencies {
configurations.all {
resolutionStrategy.force "com.lihaoyi:fastparse_${gradle.scala.depVersion}:2.1.3"
resolutionStrategy.force "com.typesafe.akka:akka-http-core_${gradle.scala.depVersion}:${gradle.akka_http.version}"
resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
resolutionStrategy.force "com.typesafe.akka:akka-http2-support_${gradle.scala.depVersion}:${gradle.akka_http.version}"
resolutionStrategy.force "com.typesafe.akka:akka-http-spray-json_${gradle.scala.depVersion}:${gradle.akka_http.version}"
resolutionStrategy.force "com.typesafe.akka:akka-parsing_${gradle.scala.depVersion}:${gradle.akka_http.version}"
resolutionStrategy.force "com.typesafe.akka:akka-http_${gradle.scala.depVersion}:${gradle.akka_http.version}"
}

compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')

}

mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
applicationDefaultJvmArgs = ["-Djava.security.egd=file:/dev/./urandom"]
24 changes: 24 additions & 0 deletions core/scheduler/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

./copyJMXFiles.sh

export SCHEDULER_OPTS
SCHEDULER_OPTS="$SCHEDULER_OPTS -Dakka.remote.netty.tcp.bind-hostname=$(hostname -i) $(./transformEnvironment.sh)"

exec scheduler/bin/scheduler "$@"
Loading

0 comments on commit 7b99af9

Please sign in to comment.