Skip to content

Commit

Permalink
[New Scheduler] Add ActivationService (#5070)
Browse files Browse the repository at this point in the history
* Add ActivationService for scheduler

* Add annotation

* Add license header

* Reformat activation.proto

* Reduce request timeout

* Add license header

* Scan code before compiling the code
  • Loading branch information
upgle authored Apr 13, 2021
1 parent 6e5850f commit cd6fded
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import spray.json.JsString
import spray.json.JsValue
import spray.json.RootJsonFormat
import spray.json.deserializationError
import spray.json._

import org.apache.openwhisk.core.entity.ArgNormalizer.trim

Expand Down Expand Up @@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal {
def asString = rev // to make explicit that this is a string conversion
def empty = rev == null
override def toString = rev
def serialize = DocRevision.serdes.write(this).compactPrint
}

/**
Expand Down Expand Up @@ -131,6 +133,8 @@ protected[core] object DocRevision {

protected[core] val empty: DocRevision = new DocRevision(null)

protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit val serdes = new RootJsonFormat[DocRevision] {
def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath,
def namespace: EntityName = path.root
def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName
def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("")
def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint

override def size = qualifiedName.sizeInBytes
override def toString = asString
Expand Down Expand Up @@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol {
}
}

protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson))

/**
* Converts the name to a fully qualified name.
* There are 3 cases:
Expand Down
29 changes: 29 additions & 0 deletions core/scheduler/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'maven'
apply plugin: 'org.scoverage'
apply plugin: 'com.lightbend.akka.grpc.gradle'

ext.dockerImageName = 'scheduler'
apply from: '../../gradle/docker.gradle'
Expand All @@ -33,6 +34,20 @@ ext.coverageDirs = [
]
distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses'

buildscript {
repositories {
mavenLocal()
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
// see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle
// for the currently latest version.
classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2'
}
}

// Define a separate configuration for managing the dependency on Jetty ALPN agent.
configurations {
alpnagent
Expand All @@ -51,7 +66,21 @@ dependencies {

compile "org.scala-lang:scala-library:${gradle.scala.version}"
compile project(':common:scala')
}

// workaround for akka-grpc
// https://github.com/akka/akka-grpc/issues/786
printProtocLogs.doFirst {
mkdir "$buildDir"
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
mkdir "$project.rootDir/build"
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
}
printProtocLogs.configure {
mkdir "$buildDir"
file("$buildDir/akka-grpc-gradle-plugin.log").text = "x"
mkdir "$project.rootDir/build"
file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x"
}

mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler"
Expand Down
22 changes: 22 additions & 0 deletions core/scheduler/src/main/java/Empty.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public class Empty {
// Workaround for this issue https://github.com/akka/akka-grpc/issues/289
// Gradle complains about no java sources.
// Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used.
}
66 changes: 66 additions & 0 deletions core/scheduler/src/main/protobuf/activation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

syntax = "proto3";
import "google/protobuf/wrappers.proto";

//#options
option java_multiple_files = true;
option java_package = "org.apache.openwhisk.grpc";
option java_outer_classname = "ActivationProto";

package activation;
//#options

//#services
service ActivationService {
rpc FetchActivation (FetchRequest) returns (FetchResponse) {}
rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {}
}
//#services

//#messages
// The request message
message FetchRequest {
string invocationNamespace = 1;
string fqn = 2;
string rev = 3;
string containerId = 4;
bool warmed = 5;
// This allows optional value
google.protobuf.Int64Value lastDuration = 6;
// to record alive containers
bool alive = 7;
}

// The response message
message FetchResponse {
string activationMessage = 1;
}

message RescheduleRequest {
string invocationNamespace = 1;
string fqn = 2;
string rev = 3;
string activationMessage = 4;
}

message RescheduleResponse {
// if reschedule request is failed, then it will be `false`
bool isRescheduled = 1;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.scheduler.grpc

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import org.apache.openwhisk.common.Logging
import org.apache.openwhisk.core.connector.{ActivationMessage, Message}
import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName}
import org.apache.openwhisk.core.scheduler.queue._
import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse}
import spray.json._

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.Try

class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService {
implicit val requestTimeout: Timeout = Timeout(5.seconds)
implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher

override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = {
logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}")
Future(for {
fqn <- FullyQualifiedEntityName.parse(request.fqn)
rev <- DocRevision.parse(request.rev)
msg <- ActivationMessage.parse(request.activationMessage)
} yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res =>
{
val key = res._1.toDocId.asDocInfo(res._2)
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
case Some(queueValue) =>
// enqueue activation message to reschedule
logging.info(
this,
s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}")
queueValue.queue ? res._3
Future.successful(RescheduleResponse(isRescheduled = true))
case None =>
logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}")
Future.successful(RescheduleResponse())
}
}
}
}

override def fetchActivation(request: FetchRequest): Future[FetchResponse] = {
Future(for {
fqn <- FullyQualifiedEntityName.parse(request.fqn)
rev <- DocRevision.parse(request.rev)
} yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res =>
val key = res._1.toDocId.asDocInfo(res._2)
QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match {
case Some(queueValue) =>
(queueValue.queue ? GetActivation(
res._1,
request.containerId,
request.warmed,
request.lastDuration,
request.alive))
.mapTo[ActivationResponse]
.map { response =>
FetchResponse(response.serialize)
}
.recover {
case t: Throwable =>
logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}")
FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize)
}
case None =>
if (QueuePool.keys.exists { mkey =>
mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id
})
Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize))
else
Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize))
}
}
}
}

object ActivationServiceImpl {

def apply()(implicit actorSystem: ActorSystem, logging: Logging) =
new ActivationServiceImpl()
}

case class GetActivation(action: FullyQualifiedEntityName,
containerId: String,
warmed: Boolean,
lastDuration: Option[Long],
alive: Boolean = true)
case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message {
override def serialize = ActivationResponse.serdes.write(this).compactPrint
}

object ActivationResponse extends DefaultJsonProtocol {

private implicit val noMessageSerdes = NoActivationMessage.serdes
private implicit val noQueueSerdes = NoMemoryQueue.serdes
private implicit val mismatchSerdes = ActionMismatch.serdes
private implicit val messageSerdes = ActivationMessage.serdes
private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat

def parse(msg: String) = Try(serdes.read(msg.parseJson))

implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] =
new RootJsonFormat[Either[A, B]] {
val format = DefaultJsonProtocol.eitherFormat[A, B]

def write(either: Either[A, B]) = format.write(either)

def read(value: JsValue) = format.read(value)
}

type ActivationResponse = Either[MemoryQueueError, ActivationMessage]
implicit val serdes = jsonFormat(ActivationResponse.apply _, "message")

}
Loading

0 comments on commit cd6fded

Please sign in to comment.