Skip to content

Commit

Permalink
[New Scheduler]Implement FPCEntitlementProvider (#5029)
Browse files Browse the repository at this point in the history
* Implement FPCEntitlementProvider

* Add throttling metric
  • Loading branch information
jiangpengcheng authored Jan 29, 2021
1 parent 212d809 commit efdbd60
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ protected[core] abstract class EntitlementProvider(
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))

private val messagingProvider = SpiLoader.get[MessagingProvider]
private val eventProducer = messagingProvider.getProducer(this.config)
protected val eventProducer = messagingProvider.getProducer(this.config)

/**
* Grants a subject the right to access a resources.
Expand Down Expand Up @@ -201,6 +201,19 @@ protected[core] abstract class EntitlementProvider(
.flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
}

/**
* Checks action activation rate throttles for an identity.
*
* @param user the identity to check rate throttles for
* @param right the privilege the subject is requesting
* @param resources the set of resource the subject requests access to
* @return a promise that completes with success iff the user is within their activation quota
*/
protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
implicit transid: TransactionId): Future[Unit] = {
checkUserThrottle(user, right, resources).flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
}

private val kindRestrictor = {
import pureconfig._
import pureconfig.generic.auto._
Expand Down Expand Up @@ -284,11 +297,7 @@ protected[core] abstract class EntitlementProvider(
val entitlementCheck: Future[Unit] = if (user.rights.contains(right)) {
if (resources.nonEmpty) {
logging.debug(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(", ")}'")
val throttleCheck =
if (noThrottle) Future.successful(())
else
checkUserThrottle(user, right, resources)
.flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
val throttleCheck = if (noThrottle) Future.successful(()) else checkThrottles(user, right, resources)
throttleCheck
.flatMap(_ => checkPrivilege(user, right, resources))
.flatMap(checkedResources => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.openwhisk.core.entitlement

import scala.concurrent.Future
import akka.actor.ActorSystem
import akka.http.scaladsl.model.StatusCodes.TooManyRequests
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.connector.{EventMessage, Metric}
import org.apache.openwhisk.core.controller.RejectRequest
import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE
import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity}
import org.apache.openwhisk.core.loadBalancer.LoadBalancer

protected[core] class FPCEntitlementProvider(
private val config: WhiskConfig,
private val loadBalancer: LoadBalancer,
private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) {

override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
implicit transid: TransactionId): Future[Unit] = {
if (right == ACTIVATE) {
val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res =>
loadBalancer.checkThrottle(user.namespace.name.toPath, res.fqname)
}
if (checks.contains(true)) {
val metric = Metric("ConcurrentRateLimit", 1)
UserEvents.send(
eventProducer,
EventMessage(
s"controller${controllerInstance.asString}",
metric,
user.subject,
user.namespace.name.toString,
user.namespace.uuid,
metric.typeName))
Future.failed(RejectRequest(TooManyRequests, "Too many requests"))
} else Future.successful(())
} else Future.successful(())
}

}

private object FPCEntitlementProvider extends EntitlementSpiProvider {

override def instance(config: WhiskConfig, loadBalancer: LoadBalancer, instance: ControllerInstanceId)(
implicit actorSystem: ActorSystem,
logging: Logging) =
new FPCEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer, instance)
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ trait LoadBalancer {

/** Gets the size of the cluster all loadbalancers are acting in */
def clusterSize: Int = 1

/** Gets the throttling for given action. */
def checkThrottle(namespace: EntityPath, action: String): Boolean = false
}

/**
Expand Down

0 comments on commit efdbd60

Please sign in to comment.