Skip to content

Commit

Permalink
[New Scheduler]Implement PFCInvokerServer (#5098)
Browse files Browse the repository at this point in the history
* Implement PFCInvokerServer

* Remove /memory api

* Default implement enable/disable
  • Loading branch information
ningyougang authored May 6, 2021
1 parent 87c8a98 commit e036fc9
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 7 deletions.
2 changes: 2 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ invoker:
keystore:
password: "{{ invoker_keystore_password | default('openwhisk') }}"
name: "{{ __invoker_ssl_keyPrefix }}openwhisk-keystore.p12"
reactiveSpi: "{{ invokerReactive_spi | default('') }}"
serverSpi: "{{ invokerServer_spi | default('') }}"

userLogs:
spi: "{{ userLogs_spi | default('org.apache.openwhisk.core.containerpool.logging.DockerToActivationLogStoreProvider') }}"
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@
"CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
"CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
"CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
"CONFIG_whisk_spi_InvokerProvider": "{{ invoker.reactiveSpi }}"
"CONFIG_whisk_spi_InvokerServerProvider": "{{ invoker.serverSpi }}"
"CONFIG_logback_log_level": "{{ invoker.loglevel }}"
"CONFIG_whisk_memory_min": "{{ limit_action_memory_min | default() }}"
"CONFIG_whisk_memory_max": "{{ limit_action_memory_max | default() }}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
import spray.json.PrettyPrinter

import scala.concurrent.ExecutionContext

/**
* Implements web server to handle certain REST API calls.
*/
class DefaultInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
implicit val ec: ExecutionContext,
val actorSystem: ActorSystem,
val logger: Logging)
extends BasicRasService {

/** Pretty print JSON response. */
implicit val jsonPrettyResponsePrinter = PrettyPrinter

override def routes(implicit transid: TransactionId): Route = {
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
(path("enable") & post) {
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
}
case _ => terminate(StatusCodes.Unauthorized)
}
}
}

object DefaultInvokerServer extends InvokerServerProvider {

// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
val invokerUsername = "admin"
val invokerPassword = "admin"

override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new DefaultInvokerServer(invoker, invokerUsername, invokerPassword)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker

import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.http.BasicRasService
import org.apache.openwhisk.http.ErrorResponse.terminate
import spray.json.PrettyPrinter

import scala.concurrent.ExecutionContext

/**
* Implements web server to handle certain REST API calls.
*/
class FPCInvokerServer(val invoker: InvokerCore, systemUsername: String, systemPassword: String)(
implicit val ec: ExecutionContext,
val actorSystem: ActorSystem,
val logger: Logging)
extends BasicRasService {

/** Pretty print JSON response. */
implicit val jsonPrettyResponsePrinter = PrettyPrinter

override def routes(implicit transid: TransactionId): Route = {
super.routes ~ extractCredentials {
case Some(BasicHttpCredentials(username, password)) if username == systemUsername && password == systemPassword =>
(path("enable") & post) {
invoker.enable()
} ~ (path("disable") & post) {
invoker.disable()
}
case _ => terminate(StatusCodes.Unauthorized)
}
}
}

object FPCInvokerServer extends InvokerServerProvider {

// TODO: TBD, after FPCInvokerReactive is ready, can read the credentials from pureconfig
val invokerUsername = "admin"
val invokerPassword = "admin"

override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new FPCInvokerServer(invoker, invokerUsername, invokerPassword)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker

import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import com.typesafe.config.ConfigValueFactory
import kamon.Kamon
Expand Down Expand Up @@ -217,7 +218,10 @@ trait InvokerProvider extends Spi {
}

// this trait can be used to add common implementation
trait InvokerCore {}
trait InvokerCore {
def enable(): Route
def disable(): Route
}

/**
* An Spi for providing RestAPI implementation for invoker.
Expand All @@ -227,9 +231,3 @@ trait InvokerServerProvider extends Spi {
def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
}

object DefaultInvokerServer extends InvokerServerProvider {
override def instance(
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
new BasicRasService {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.time.Instant
import akka.Done
import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.server.Directives.complete
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import org.apache.openwhisk.common._
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
Expand Down Expand Up @@ -299,4 +301,12 @@ class InvokerReactive(
}
})

override def enable(): Route = {
complete("not supported")
}

override def disable(): Route = {
complete("not supported")
}

}
4 changes: 4 additions & 0 deletions tests/src/test/scala/common/WhiskProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ public static String getBaseControllerAddress() {
return getBaseControllerHost() + ":" + getControllerBasePort();
}

public static String getBaseInvokerAddress(){
return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort");
}

public static int getMaxActionInvokesPerMinute() {
String valStr = whiskProperties.getProperty("limits.actions.invokes.perMinute");
return Integer.parseInt(valStr);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.invoker.test

import akka.http.scaladsl.model.StatusCodes.{OK, Unauthorized}
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.testkit.ScalatestRouteTest
import common.StreamLogging
import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.invoker.{DefaultInvokerServer, InvokerCore}
import org.apache.openwhisk.http.BasicHttpService
import org.junit.runner.RunWith
import org.scalamock.scalatest.MockFactory
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner

/**
* Tests InvokerServer API.
*/
@RunWith(classOf[JUnitRunner])
class DefaultInvokerServerTests
extends FlatSpec
with BeforeAndAfterEach
with BeforeAndAfterAll
with ScalatestRouteTest
with Matchers
with StreamLogging
with MockFactory {

def transid() = TransactionId("tid")

val systemUsername = "username"
val systemPassword = "password"

val reactive = new TestInvokerReactive
val server = new DefaultInvokerServer(reactive, systemUsername, systemPassword)

override protected def afterEach(): Unit = reactive.reset()

/** DefaultInvokerServer API tests */
behavior of "DefaultInvokerServer API"

it should "enable invoker" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Post(s"/enable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
reactive.enableCount shouldBe 1
reactive.disableCount shouldBe 0
}
}

it should "disable invoker" in {
implicit val tid = transid()
val validCredentials = BasicHttpCredentials(systemUsername, systemPassword)
Post(s"/disable") ~> addCredentials(validCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(OK)
reactive.enableCount shouldBe 0
reactive.disableCount shouldBe 1
}
}

it should "not enable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
Post(s"/enable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(Unauthorized)
reactive.enableCount shouldBe 0
reactive.disableCount shouldBe 0
}
}

it should "not disable invoker with invalid credential" in {
implicit val tid = transid()
val invalidCredentials = BasicHttpCredentials("invaliduser", "invalidpass")
Post(s"/disable") ~> addCredentials(invalidCredentials) ~> Route.seal(server.routes(tid)) ~> check {
status should be(Unauthorized)
reactive.enableCount shouldBe 0
reactive.disableCount shouldBe 0
}
}

it should "not enable invoker with empty credential" in {
implicit val tid = transid()
Post(s"/enable") ~> Route.seal(server.routes(tid)) ~> check {
status should be(Unauthorized)
reactive.enableCount shouldBe 0
reactive.disableCount shouldBe 0
}
}

it should "not disable invoker with empty credential" in {
implicit val tid = transid()
Post(s"/disable") ~> Route.seal(server.routes(tid)) ~> check {
status should be(Unauthorized)
reactive.enableCount shouldBe 0
reactive.disableCount shouldBe 0
}
}

}

class TestInvokerReactive extends InvokerCore with BasicHttpService {
var enableCount = 0
var disableCount = 0

override def enable(): Route = {
enableCount += 1
complete("")
}

override def disable(): Route = {
disableCount += 1
complete("")
}

def reset(): Unit = {
enableCount = 0
disableCount = 0
}

/**
* Gets the routes implemented by the HTTP service.
*
* @param transid the id for the transaction (every request is assigned an id)
*/
override def routes(implicit transid: TransactionId): Route = ???

}
Loading

0 comments on commit e036fc9

Please sign in to comment.