From efdbd6049a849eb432e1c3fffc56bdb3fd344eaf Mon Sep 17 00:00:00 2001 From: jiangpch Date: Sat, 30 Jan 2021 07:02:45 +0800 Subject: [PATCH] [New Scheduler]Implement FPCEntitlementProvider (#5029) * Implement FPCEntitlementProvider * Add throttling metric --- .../core/entitlement/Entitlement.scala | 21 ++++-- .../core/entitlement/FPCEntitlement.scala | 67 +++++++++++++++++++ .../core/loadBalancer/LoadBalancer.scala | 3 + 3 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala index 4b33a4b37bb..8092ad8f0bd 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala @@ -153,7 +153,7 @@ protected[core] abstract class EntitlementProvider( activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations)) private val messagingProvider = SpiLoader.get[MessagingProvider] - private val eventProducer = messagingProvider.getProducer(this.config) + protected val eventProducer = messagingProvider.getProducer(this.config) /** * Grants a subject the right to access a resources. @@ -201,6 +201,19 @@ protected[core] abstract class EntitlementProvider( .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user)) } + /** + * Checks action activation rate throttles for an identity. + * + * @param user the identity to check rate throttles for + * @param right the privilege the subject is requesting + * @param resources the set of resource the subject requests access to + * @return a promise that completes with success iff the user is within their activation quota + */ + protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])( + implicit transid: TransactionId): Future[Unit] = { + checkUserThrottle(user, right, resources).flatMap(_ => checkConcurrentUserThrottle(user, right, resources)) + } + private val kindRestrictor = { import pureconfig._ import pureconfig.generic.auto._ @@ -284,11 +297,7 @@ protected[core] abstract class EntitlementProvider( val entitlementCheck: Future[Unit] = if (user.rights.contains(right)) { if (resources.nonEmpty) { logging.debug(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(", ")}'") - val throttleCheck = - if (noThrottle) Future.successful(()) - else - checkUserThrottle(user, right, resources) - .flatMap(_ => checkConcurrentUserThrottle(user, right, resources)) + val throttleCheck = if (noThrottle) Future.successful(()) else checkThrottles(user, right, resources) throttleCheck .flatMap(_ => checkPrivilege(user, right, resources)) .flatMap(checkedResources => { diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala new file mode 100644 index 00000000000..9e89db23dc9 --- /dev/null +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.entitlement + +import scala.concurrent.Future +import akka.actor.ActorSystem +import akka.http.scaladsl.model.StatusCodes.TooManyRequests +import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents} +import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.connector.{EventMessage, Metric} +import org.apache.openwhisk.core.controller.RejectRequest +import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE +import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity} +import org.apache.openwhisk.core.loadBalancer.LoadBalancer + +protected[core] class FPCEntitlementProvider( + private val config: WhiskConfig, + private val loadBalancer: LoadBalancer, + private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging) + extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) { + + override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])( + implicit transid: TransactionId): Future[Unit] = { + if (right == ACTIVATE) { + val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res => + loadBalancer.checkThrottle(user.namespace.name.toPath, res.fqname) + } + if (checks.contains(true)) { + val metric = Metric("ConcurrentRateLimit", 1) + UserEvents.send( + eventProducer, + EventMessage( + s"controller${controllerInstance.asString}", + metric, + user.subject, + user.namespace.name.toString, + user.namespace.uuid, + metric.typeName)) + Future.failed(RejectRequest(TooManyRequests, "Too many requests")) + } else Future.successful(()) + } else Future.successful(()) + } + +} + +private object FPCEntitlementProvider extends EntitlementSpiProvider { + + override def instance(config: WhiskConfig, loadBalancer: LoadBalancer, instance: ControllerInstanceId)( + implicit actorSystem: ActorSystem, + logging: Logging) = + new FPCEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer, instance) +} diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index 041ce0c9bbb..20225127c3c 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -75,6 +75,9 @@ trait LoadBalancer { /** Gets the size of the cluster all loadbalancers are acting in */ def clusterSize: Int = 1 + + /** Gets the throttling for given action. */ + def checkThrottle(namespace: EntityPath, action: String): Boolean = false } /**