From cd6fded8a6836756cbfbe4159064c85683b64cd7 Mon Sep 17 00:00:00 2001 From: Seonghyun Oh Date: Tue, 13 Apr 2021 19:32:48 +0900 Subject: [PATCH] [New Scheduler] Add ActivationService (#5070) * Add ActivationService for scheduler * Add annotation * Add license header * Reformat activation.proto * Reduce request timeout * Add license header * Scan code before compiling the code --- .../openwhisk/core/entity/DocInfo.scala | 4 + .../entity/FullyQualifiedEntityName.scala | 3 + core/scheduler/build.gradle | 29 +++ core/scheduler/src/main/java/Empty.java | 22 +++ .../src/main/protobuf/activation.proto | 66 +++++++ .../grpc/ActivationServiceImpl.scala | 135 ++++++++++++++ .../core/scheduler/queue/QueueManager.scala | 119 ++++++++++++ .../test/ActivationServiceImplTests.scala | 173 ++++++++++++++++++ .../scheduler/grpc/test/CommonVariable.scala | 40 ++++ tools/travis/runUnitTests.sh | 2 - tools/travis/setup.sh | 3 + 11 files changed, 594 insertions(+), 2 deletions(-) create mode 100644 core/scheduler/src/main/java/Empty.java create mode 100644 core/scheduler/src/main/protobuf/activation.proto create mode 100644 core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala create mode 100644 core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala create mode 100644 tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala index f8849d16de0..77e200862c5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/DocInfo.scala @@ -25,6 +25,7 @@ import spray.json.JsString import spray.json.JsValue import spray.json.RootJsonFormat import spray.json.deserializationError +import spray.json._ import org.apache.openwhisk.core.entity.ArgNormalizer.trim @@ -59,6 +60,7 @@ protected[core] class DocRevision private (val rev: String) extends AnyVal { def asString = rev // to make explicit that this is a string conversion def empty = rev == null override def toString = rev + def serialize = DocRevision.serdes.write(this).compactPrint } /** @@ -131,6 +133,8 @@ protected[core] object DocRevision { protected[core] val empty: DocRevision = new DocRevision(null) + protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = new RootJsonFormat[DocRevision] { def write(d: DocRevision) = if (d.rev != null) JsString(d.rev) else JsNull diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala index 44bb9717f2c..f7669676607 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/FullyQualifiedEntityName.scala @@ -56,6 +56,7 @@ protected[core] case class FullyQualifiedEntityName(path: EntityPath, def namespace: EntityName = path.root def qualifiedNameWithLeadingSlash: String = EntityPath.PATHSEP + qualifiedName def asString = path.addPath(name) + version.map("@" + _.toString).getOrElse("") + def serialize = FullyQualifiedEntityName.serdes.write(this).compactPrint override def size = qualifiedName.sizeInBytes override def toString = asString @@ -101,6 +102,8 @@ protected[core] object FullyQualifiedEntityName extends DefaultJsonProtocol { } } + protected[core] def parse(msg: String) = Try(serdes.read(msg.parseJson)) + /** * Converts the name to a fully qualified name. * There are 3 cases: diff --git a/core/scheduler/build.gradle b/core/scheduler/build.gradle index 530c9624fea..65d088a01a6 100644 --- a/core/scheduler/build.gradle +++ b/core/scheduler/build.gradle @@ -20,6 +20,7 @@ apply plugin: 'application' apply plugin: 'eclipse' apply plugin: 'maven' apply plugin: 'org.scoverage' +apply plugin: 'com.lightbend.akka.grpc.gradle' ext.dockerImageName = 'scheduler' apply from: '../../gradle/docker.gradle' @@ -33,6 +34,20 @@ ext.coverageDirs = [ ] distDockerCoverage.dependsOn ':common:scala:scoverageClasses', 'scoverageClasses' +buildscript { + repositories { + mavenLocal() + maven { + url "https://plugins.gradle.org/m2/" + } + } + dependencies { + // see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle + // for the currently latest version. + classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:0.7.2' + } +} + // Define a separate configuration for managing the dependency on Jetty ALPN agent. configurations { alpnagent @@ -51,7 +66,21 @@ dependencies { compile "org.scala-lang:scala-library:${gradle.scala.version}" compile project(':common:scala') +} +// workaround for akka-grpc +// https://github.com/akka/akka-grpc/issues/786 +printProtocLogs.doFirst { + mkdir "$buildDir" + file("$buildDir/akka-grpc-gradle-plugin.log").text = "x" + mkdir "$project.rootDir/build" + file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x" +} +printProtocLogs.configure { + mkdir "$buildDir" + file("$buildDir/akka-grpc-gradle-plugin.log").text = "x" + mkdir "$project.rootDir/build" + file("$project.rootDir/build/akka-grpc-gradle-plugin.log").text = "x" } mainClassName = "org.apache.openwhisk.core.scheduler.Scheduler" diff --git a/core/scheduler/src/main/java/Empty.java b/core/scheduler/src/main/java/Empty.java new file mode 100644 index 00000000000..b982d8f506a --- /dev/null +++ b/core/scheduler/src/main/java/Empty.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class Empty { + // Workaround for this issue https://github.com/akka/akka-grpc/issues/289 + // Gradle complains about no java sources. + // Note. Openwhisk is using a lower gradle version, so the latest akka-grpc version cannot be used. +} diff --git a/core/scheduler/src/main/protobuf/activation.proto b/core/scheduler/src/main/protobuf/activation.proto new file mode 100644 index 00000000000..fb16f48d2da --- /dev/null +++ b/core/scheduler/src/main/protobuf/activation.proto @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; +import "google/protobuf/wrappers.proto"; + +//#options +option java_multiple_files = true; +option java_package = "org.apache.openwhisk.grpc"; +option java_outer_classname = "ActivationProto"; + +package activation; +//#options + +//#services +service ActivationService { + rpc FetchActivation (FetchRequest) returns (FetchResponse) {} + rpc RescheduleActivation (RescheduleRequest) returns (RescheduleResponse) {} +} +//#services + +//#messages +// The request message +message FetchRequest { + string invocationNamespace = 1; + string fqn = 2; + string rev = 3; + string containerId = 4; + bool warmed = 5; + // This allows optional value + google.protobuf.Int64Value lastDuration = 6; + // to record alive containers + bool alive = 7; +} + +// The response message +message FetchResponse { + string activationMessage = 1; +} + +message RescheduleRequest { + string invocationNamespace = 1; + string fqn = 2; + string rev = 3; + string activationMessage = 4; +} + +message RescheduleResponse { + // if reschedule request is failed, then it will be `false` + bool isRescheduled = 1; +} + diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala new file mode 100644 index 00000000000..d80cd4259de --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/grpc/ActivationServiceImpl.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.grpc + +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.util.Timeout +import org.apache.openwhisk.common.Logging +import org.apache.openwhisk.core.connector.{ActivationMessage, Message} +import org.apache.openwhisk.core.entity.{DocRevision, FullyQualifiedEntityName} +import org.apache.openwhisk.core.scheduler.queue._ +import org.apache.openwhisk.grpc.{ActivationService, FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse} +import spray.json._ + +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.Try + +class ActivationServiceImpl()(implicit actorSystem: ActorSystem, logging: Logging) extends ActivationService { + implicit val requestTimeout: Timeout = Timeout(5.seconds) + implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher + + override def rescheduleActivation(request: RescheduleRequest): Future[RescheduleResponse] = { + logging.info(this, s"Try to reschedule activation ${request.invocationNamespace} ${request.fqn} ${request.rev}") + Future(for { + fqn <- FullyQualifiedEntityName.parse(request.fqn) + rev <- DocRevision.parse(request.rev) + msg <- ActivationMessage.parse(request.activationMessage) + } yield (fqn, rev, msg)).flatMap(Future.fromTry) flatMap { res => + { + val key = res._1.toDocId.asDocInfo(res._2) + QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { + case Some(queueValue) => + // enqueue activation message to reschedule + logging.info( + this, + s"Enqueue activation message to reschedule ${request.invocationNamespace} ${request.fqn} ${request.rev}") + queueValue.queue ? res._3 + Future.successful(RescheduleResponse(isRescheduled = true)) + case None => + logging.error(this, s"Queue not found for ${request.invocationNamespace} ${request.fqn} ${request.rev}") + Future.successful(RescheduleResponse()) + } + } + } + } + + override def fetchActivation(request: FetchRequest): Future[FetchResponse] = { + Future(for { + fqn <- FullyQualifiedEntityName.parse(request.fqn) + rev <- DocRevision.parse(request.rev) + } yield (fqn, rev)).flatMap(Future.fromTry) flatMap { res => + val key = res._1.toDocId.asDocInfo(res._2) + QueuePool.get(MemoryQueueKey(request.invocationNamespace, key)) match { + case Some(queueValue) => + (queueValue.queue ? GetActivation( + res._1, + request.containerId, + request.warmed, + request.lastDuration, + request.alive)) + .mapTo[ActivationResponse] + .map { response => + FetchResponse(response.serialize) + } + .recover { + case t: Throwable => + logging.error(this, s"Failed to get message from QueueManager, error: ${t.getMessage}") + FetchResponse(ActivationResponse(Left(NoActivationMessage())).serialize) + } + case None => + if (QueuePool.keys.exists { mkey => + mkey.invocationNamespace == request.invocationNamespace && mkey.docInfo.id == key.id + }) + Future.successful(FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize)) + else + Future.successful(FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize)) + } + } + } +} + +object ActivationServiceImpl { + + def apply()(implicit actorSystem: ActorSystem, logging: Logging) = + new ActivationServiceImpl() +} + +case class GetActivation(action: FullyQualifiedEntityName, + containerId: String, + warmed: Boolean, + lastDuration: Option[Long], + alive: Boolean = true) +case class ActivationResponse(message: Either[MemoryQueueError, ActivationMessage]) extends Message { + override def serialize = ActivationResponse.serdes.write(this).compactPrint +} + +object ActivationResponse extends DefaultJsonProtocol { + + private implicit val noMessageSerdes = NoActivationMessage.serdes + private implicit val noQueueSerdes = NoMemoryQueue.serdes + private implicit val mismatchSerdes = ActionMismatch.serdes + private implicit val messageSerdes = ActivationMessage.serdes + private implicit val memoryqueueuErrorSerdes = MemoryQueueErrorSerdes.memoryQueueErrorFormat + + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + + implicit def rootEitherFormat[A: RootJsonFormat, B: RootJsonFormat] = + new RootJsonFormat[Either[A, B]] { + val format = DefaultJsonProtocol.eitherFormat[A, B] + + def write(either: Either[A, B]) = format.write(either) + + def read(value: JsValue) = format.read(value) + } + + type ActivationResponse = Either[MemoryQueueError, ActivationMessage] + implicit val serdes = jsonFormat(ActivationResponse.apply _, "message") + +} diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala new file mode 100644 index 00000000000..1b6b818c8f8 --- /dev/null +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.queue + +import akka.actor.ActorRef +import org.apache.openwhisk.core.connector._ +import org.apache.openwhisk.core.entity._ +import spray.json.{DefaultJsonProtocol, _} +import scala.collection.concurrent.TrieMap +import scala.util.Try + +object QueueSize +case class MemoryQueueKey(invocationNamespace: String, docInfo: DocInfo) +case class MemoryQueueValue(queue: ActorRef, isLeader: Boolean) + +sealed trait MemoryQueueError extends Product { + val causedBy: String +} + +object MemoryQueueErrorSerdes { + + private implicit val noMessageSerdes = NoActivationMessage.serdes + private implicit val noQueueSerdes = NoMemoryQueue.serdes + private implicit val mismatchSerdes = ActionMismatch.serdes + + // format that discriminates based on an additional + // field "type" that can either be "Cat" or "Dog" + implicit val memoryQueueErrorFormat = new RootJsonFormat[MemoryQueueError] { + def write(obj: MemoryQueueError): JsValue = + JsObject((obj match { + case msg: NoActivationMessage => msg.toJson + case msg: NoMemoryQueue => msg.toJson + case msg: ActionMismatch => msg.toJson + }).asJsObject.fields + ("type" -> JsString(obj.productPrefix))) + + def read(json: JsValue): MemoryQueueError = + json.asJsObject.getFields("type") match { + case Seq(JsString("NoActivationMessage")) => json.convertTo[NoActivationMessage] + case Seq(JsString("NoMemoryQueue")) => json.convertTo[NoMemoryQueue] + case Seq(JsString("ActionMismatch")) => json.convertTo[ActionMismatch] + } + } +} + +case class NoActivationMessage(noActivationMessage: String = NoActivationMessage.asString) + extends MemoryQueueError + with Message { + override val causedBy: String = noActivationMessage + override def serialize = NoActivationMessage.serdes.write(this).compactPrint +} + +object NoActivationMessage extends DefaultJsonProtocol { + val asString: String = "no activation message exist" + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(NoActivationMessage.apply _, "noActivationMessage") +} + +case class NoMemoryQueue(noMemoryQueue: String = NoMemoryQueue.asString) extends MemoryQueueError with Message { + override val causedBy: String = noMemoryQueue + override def serialize = NoMemoryQueue.serdes.write(this).compactPrint +} + +object NoMemoryQueue extends DefaultJsonProtocol { + val asString: String = "no memory queue exist" + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(NoMemoryQueue.apply _, "noMemoryQueue") +} + +case class ActionMismatch(actionMisMatch: String = ActionMismatch.asString) extends MemoryQueueError with Message { + override val causedBy: String = actionMisMatch + override def serialize = ActionMismatch.serdes.write(this).compactPrint +} + +object ActionMismatch extends DefaultJsonProtocol { + val asString: String = "action version does not match" + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(ActionMismatch.apply _, "actionMisMatch") +} + +object QueuePool { + private val _queuePool = TrieMap[MemoryQueueKey, MemoryQueueValue]() + + private[scheduler] def get(key: MemoryQueueKey) = _queuePool.get(key) + + private[scheduler] def put(key: MemoryQueueKey, value: MemoryQueueValue) = _queuePool.put(key, value) + + private[scheduler] def remove(key: MemoryQueueKey) = _queuePool.remove(key) + + private[scheduler] def countLeader() = _queuePool.count(_._2.isLeader) + + private[scheduler] def clear(): Unit = _queuePool.clear() + + private[scheduler] def size = _queuePool.size + + private[scheduler] def values = _queuePool.values + + private[scheduler] def keys = _queuePool.keys +} + +case class CreateQueue(invocationNamespace: String, + fqn: FullyQualifiedEntityName, + revision: DocRevision, + whiskActionMetaData: WhiskActionMetaData) +case class CreateQueueResponse(invocationNamespace: String, fqn: FullyQualifiedEntityName, success: Boolean) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala new file mode 100644 index 00000000000..75c913e9e34 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/ActivationServiceImplTests.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.grpc.test + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.ActorMaterializer +import akka.testkit.{ImplicitSender, TestKit} +import common.StreamLogging +import org.apache.openwhisk.common.TransactionId +import org.apache.openwhisk.core.connector.ActivationMessage +import org.apache.openwhisk.core.entity._ +import org.apache.openwhisk.core.scheduler.grpc.ActivationServiceImpl +import org.apache.openwhisk.core.scheduler.queue.{ + ActionMismatch, + MemoryQueueKey, + MemoryQueueValue, + NoMemoryQueue, + QueuePool +} +import org.apache.openwhisk.grpc.{FetchRequest, FetchResponse, RescheduleRequest, RescheduleResponse} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike, Matchers} +import org.apache.openwhisk.core.scheduler.grpc.{ActivationResponse, GetActivation} +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +@RunWith(classOf[JUnitRunner]) +class ActivationServiceImplTests + extends TestKit(ActorSystem("ActivationService")) + with CommonVariable + with ImplicitSender + with FlatSpecLike + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with ScalaFutures + with StreamLogging { + + override def afterAll = { + QueuePool.clear() + TestKit.shutdownActorSystem(system) + } + override def beforeEach = QueuePool.clear() + + behavior of "ActivationService" + + implicit val mat = ActorMaterializer() + implicit val ec = system.dispatcher + + val messageTransId = TransactionId(TransactionId.testing.meta.id) + val uuid = UUID() + + val testDoc = testFQN.toDocId.asDocInfo(testDocRevision) + val message = ActivationMessage( + messageTransId, + FullyQualifiedEntityName(testEntityPath, testEntityName), + DocRevision.empty, + Identity( + Subject(), + Namespace(EntityName(testNamespace), uuid), + BasicAuthenticationAuthKey(uuid, Secret()), + Set.empty), + ActivationId.generate(), + ControllerInstanceId("0"), + blocking = false, + content = None) + + it should "send GetActivation message to the MemoryQueue actor" in { + + val mock = system.actorOf(Props(new Actor() { + override def receive: Receive = { + case getActivation: GetActivation => + testActor ! getActivation + sender() ! ActivationResponse(Right(message)) + } + })) + + QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true)) + val activationServiceImpl = ActivationServiceImpl() + + activationServiceImpl + .fetchActivation( + FetchRequest( + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + testContainerId, + false, + alive = true)) + .futureValue shouldBe FetchResponse(ActivationResponse(Right(message)).serialize) + + expectMsg(GetActivation(testFQN, testContainerId, false, None)) + } + + it should "return NoMemoryQueue if there is no queue" in { + val activationServiceImpl = ActivationServiceImpl() + + activationServiceImpl + .fetchActivation( + FetchRequest( + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + testContainerId, + false, + alive = true)) + .futureValue shouldBe FetchResponse(ActivationResponse(Left(NoMemoryQueue())).serialize) + + expectNoMessage(200.millis) + } + + it should "return ActionMismatchError if get request for an old action" in { + + val activationServiceImpl = ActivationServiceImpl() + + QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(testActor, true)) + + activationServiceImpl + .fetchActivation( + FetchRequest( // same doc id but with a different doc revision + message.user.namespace.name.asString, + testFQN.serialize, + DocRevision("new-one").serialize, + testContainerId, + false, + alive = true)) + .futureValue shouldBe FetchResponse(ActivationResponse(Left(ActionMismatch())).serialize) + + expectNoMessage(200.millis) + } + + it should "reschedule activation message to the queue" in { + + val mock = system.actorOf(Props(new Actor() { + override def receive: Receive = { + case message: ActivationMessage => + testActor ! message + } + })) + val activationServiceImpl = ActivationServiceImpl() + + QueuePool.put(MemoryQueueKey(testEntityPath.asString, testDoc), MemoryQueueValue(mock, true)) + + activationServiceImpl + .rescheduleActivation( + RescheduleRequest( // same doc id but with a different doc revision + message.user.namespace.name.asString, + testFQN.serialize, + testDocRevision.serialize, + message.serialize)) + .futureValue shouldBe RescheduleResponse(isRescheduled = true) + + expectMsg(message) + } + +} diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala new file mode 100644 index 00000000000..b3efe97cee9 --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/grpc/test/CommonVariable.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.scheduler.grpc.test + +import org.apache.openwhisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} +import org.apache.openwhisk.core.entity._ + +trait CommonVariable { + val testInvocationNamespace = "test-invocation-namespace" + val testInvocationEntityPath = EntityPath(testInvocationNamespace) + val testNamespace = "test-namespace" + val testEntityPath = EntityPath(testNamespace) + val testAction = "test-fqn" + val testEntityName = EntityName(testAction) + val testDocRevision = DocRevision("1-test-revision") + val testContainerId = "fakeContainerId" + val semVer = SemVer(0, 1, 1) + val testVersion = Some(semVer) + val testFQN = FullyQualifiedEntityName(testEntityPath, testEntityName, testVersion) + val testExec = CodeExecAsString(RuntimeManifest("nodejs:10", ImageName("testImage")), "testCode", None) + val testExecMetadata = + CodeExecMetaDataAsString(testExec.manifest, entryPoint = testExec.entryPoint) + val testActionMetaData = + WhiskActionMetaData(testEntityPath, testEntityName, testExecMetadata, version = semVer) +} diff --git a/tools/travis/runUnitTests.sh b/tools/travis/runUnitTests.sh index e2ef4ac528e..3f66caec3fe 100755 --- a/tools/travis/runUnitTests.sh +++ b/tools/travis/runUnitTests.sh @@ -26,8 +26,6 @@ cd $ROOTDIR/tools/travis export TESTCONTAINERS_RYUK_DISABLED="true" export ORG_GRADLE_PROJECT_testSetName="REQUIRE_ONLY_DB" -./scan.sh - ./setupPrereq.sh cat "$ROOTDIR/tests/src/test/resources/application.conf" diff --git a/tools/travis/setup.sh b/tools/travis/setup.sh index 758d5a08d18..6d67941e340 100755 --- a/tools/travis/setup.sh +++ b/tools/travis/setup.sh @@ -42,6 +42,9 @@ python -m pip install --user pydocumentdb # Support the revises log upload script python -m pip install --user humanize requests +# Scan code before compiling the code +./scan.sh + # Basic check that all code compiles and depdendencies are downloaded correctly. # Compiling the tests will compile all components as well. #