Skip to content

Commit

Permalink
Throttle the system based on active-ack timeouts. (#3875)
Browse files Browse the repository at this point in the history
Today, we have an arbitrary system-wide limit of maximum concurrent connections. In general that is fine, but it doesn't have a direct correlation to what's actually happening in the system.

This adds a new state to each monitored invoker: Overloaded. An invoker will go into overloaded state if active-acks are starting to timeout. Eventually, if the system is really overloaded, all Invokers will be in overloaded state which will cause the loadbalancer to return a failure. This failure now results in a 503 - System overloaded message back to the user.
  • Loading branch information
markusthoemmes authored and cbickel committed Jul 26, 2018
1 parent 006561f commit 9dd34f2
Show file tree
Hide file tree
Showing 17 changed files with 211 additions and 161 deletions.
2 changes: 0 additions & 2 deletions ansible/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,10 @@ The default system throttling limits are configured in this file [./group_vars/a
limits:
invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}"
concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}"
concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}"
firesPerMinute: "{{ limit_fires_per_minute | default(60) }}"
sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}"
```
- The `limits.invocationsPerMinute` represents the allowed namespace action invocations per minute.
- The `limits.concurrentInvocations` represents the maximum concurrent invocations allowed per namespace.
- The `limits.concurrentInvocationsSystem` represents the maximum concurrent invocations the system will allow across all namespaces.
- The `limits.firesPerMinute` represents the allowed namespace trigger firings per minute.
- The `limits.sequenceMaxLength` represents the maximum length of a sequence action.
1 change: 0 additions & 1 deletion ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ runtimesManifest: "{{ runtimes_manifest | default(lookup('file', openwhisk_home
limits:
invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}"
concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}"
concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}"
firesPerMinute: "{{ limit_fires_per_minute | default(60) }}"
sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}"

Expand Down
2 changes: 0 additions & 2 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENTINSYSTEM":
"{{ limits.concurrentInvocationsSystem }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
"LIMITS_ACTIONS_SEQUENCE_MAXLENGTH": "{{ limits.sequenceMaxLength }}"

Expand Down
1 change: 0 additions & 1 deletion ansible/templates/whisk.properties.j2
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ runtimes.manifest={{ runtimesManifest | to_json }}

limits.actions.invokes.perMinute={{ limits.invocationsPerMinute }}
limits.actions.invokes.concurrent={{ limits.concurrentInvocations }}
limits.actions.invokes.concurrentInSystem={{ limits.concurrentInvocationsSystem }}
limits.triggers.fires.perMinute={{ limits.firesPerMinute }}
limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }}

Expand Down
4 changes: 2 additions & 2 deletions common/scala/src/main/scala/whisk/common/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ object LoggingMarkers {
def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", count)

// Check invoker healthy state from loadbalancer
val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count)
val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count)
def LOADBALANCER_INVOKER_STATUS_CHANGE(state: String) =
LogMarkerToken(loadbalancer, "invokerState", count, Some(state))
val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count)

def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) =
Expand Down
2 changes: 1 addition & 1 deletion common/scala/src/main/scala/whisk/common/RingBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ class RingBuffer[T](size: Int) {

def add(el: T) = inner.add(el)

def toList() = inner.toArray().asInstanceOf[Array[T]].toList
def toList = inner.toArray().asInstanceOf[Array[T]].toList
}
2 changes: 0 additions & 2 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class WhiskConfig(requiredProperties: Map[String, String],
val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit)
val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit)
val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit)
val actionInvokeSystemOverloadLimit = this(WhiskConfig.actionInvokeSystemOverloadLimit)
val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit)
val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes)
}
Expand Down Expand Up @@ -189,7 +188,6 @@ object WhiskConfig {
val actionSequenceMaxLimit = "limits.actions.sequence.maxLength"
val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute"
val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent"
val actionInvokeSystemOverloadLimit = "limits.actions.invokes.concurrentInSystem"
val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute"
val controllerSeedNodes = "akka.cluster.seed.nodes"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import whisk.http.Messages
import whisk.http.Messages._
import whisk.core.entitlement.Resource
import whisk.core.entitlement.Collection
import whisk.core.loadBalancer.LoadBalancerException

/**
* A singleton object which defines the properties that must be present in a configuration
Expand Down Expand Up @@ -280,6 +281,9 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
case Failure(RejectRequest(code, message)) =>
logging.debug(this, s"[POST] action rejected with code $code: $message")
terminate(code, message)
case Failure(t: LoadBalancerException) =>
logging.error(this, s"[POST] failed in loadbalancer: ${t.getMessage}")
terminate(ServiceUnavailable)
case Failure(t: Throwable) =>
logging.error(this, s"[POST] action activation failed: ${t.getMessage}")
terminate(InternalServerError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import whisk.core.entitlement._
import whisk.core.entity._
import whisk.core.entity.ActivationId.ActivationIdGenerator
import whisk.core.entity.ExecManifest.Runtimes
import whisk.core.loadBalancer.{Healthy, LoadBalancerProvider}
import whisk.core.loadBalancer.{InvokerState, LoadBalancerProvider}
import whisk.http.BasicHttpService
import whisk.http.BasicRasService
import whisk.spi.SpiLoader
Expand Down Expand Up @@ -151,7 +151,7 @@ class Controller(val instance: ControllerInstanceId,
complete {
loadBalancer
.invokerHealth()
.map(_.count(_.status == Healthy).toJson)
.map(_.count(_.status == InvokerState.Healthy).toJson)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import whisk.core.controller.actions.PostActionActivation
import whisk.core.database._
import whisk.core.entity._
import whisk.core.entity.types._
import whisk.core.loadBalancer.LoadBalancerException
import whisk.http.ErrorResponse.terminate
import whisk.http.Messages
import whisk.http.LenientSprayJsonSupport._
Expand Down Expand Up @@ -673,6 +674,10 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc

case Failure(t: RejectRequest) => terminate(t.code, t.message)

case Failure(t: LoadBalancerException) =>
logging.error(this, s"failed in loadbalancer: $t")
terminate(ServiceUnavailable)

case Failure(t) =>
logging.error(this, s"exception in completeRequest: $t")
terminate(InternalServerError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ import scala.concurrent.{ExecutionContext, Future}
*
* @param loadBalancer contains active quotas
* @param concurrencyLimit a calculated limit relative to the user using the system
* @param systemOverloadLimit the limit when the system is considered overloaded
*/
class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int, systemOverloadLimit: Int)(
class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int)(
implicit logging: Logging,
executionContext: ExecutionContext) {

logging.info(this, s"systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller)

/**
* Checks whether the operation should be allowed to proceed.
*/
Expand All @@ -50,20 +47,6 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity
ConcurrentRateLimit(concurrentActivations, currentLimit)
}
}

/**
* Checks whether the system is in a generally overloaded state.
*/
def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = {
loadBalancer.totalActiveActivations.map { concurrentActivations =>
val overloaded = concurrentActivations > systemOverloadLimit
if (overloaded)
logging.info(
this,
s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit")
overloaded
}
}
}

sealed trait RateLimit {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import whisk.core.entity._
import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
import whisk.http.ErrorResponse
import whisk.http.Messages
import whisk.http.Messages._
import whisk.core.connector.MessagingProvider
import whisk.spi.SpiLoader
import whisk.spi.Spi
Expand Down Expand Up @@ -74,8 +73,7 @@ protected[core] object EntitlementProvider {
val requiredProperties = Map(
WhiskConfig.actionInvokePerMinuteLimit -> null,
WhiskConfig.actionInvokeConcurrentLimit -> null,
WhiskConfig.triggerFirePerMinuteLimit -> null,
WhiskConfig.actionInvokeSystemOverloadLimit -> null)
WhiskConfig.triggerFirePerMinuteLimit -> null)
}

/**
Expand Down Expand Up @@ -148,8 +146,7 @@ protected[core] abstract class EntitlementProvider(
private val concurrentInvokeThrottler =
new ActivationThrottler(
loadBalancer,
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations),
config.actionInvokeSystemOverloadLimit.toInt)
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))

private val messagingProvider = SpiLoader.get[MessagingProvider]
private val eventProducer = messagingProvider.getProducer(this.config)
Expand Down Expand Up @@ -196,8 +193,7 @@ protected[core] abstract class EntitlementProvider(
protected[core] def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = {

logging.debug(this, s"checking user '${user.subject}' has not exceeded activation quota")
checkSystemOverload(ACTIVATE)
.flatMap(_ => checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user))
checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user)
.flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
}

Expand Down Expand Up @@ -257,8 +253,7 @@ protected[core] abstract class EntitlementProvider(
val throttleCheck =
if (noThrottle) Future.successful(())
else
checkSystemOverload(right)
.flatMap(_ => checkUserThrottle(user, right, resources))
checkUserThrottle(user, right, resources)
.flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
throttleCheck
.flatMap(_ => checkPrivilege(user, right, resources))
Expand Down Expand Up @@ -311,22 +306,6 @@ protected[core] abstract class EntitlementProvider(
}
}

/**
* Limits activations if the system is overloaded.
*
* @param right the privilege, if ACTIVATE then check quota else return None
* @return future completing successfully if system is not overloaded else failing with a rejection
*/
protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = {
concurrentInvokeThrottler.isOverloaded.flatMap { isOverloaded =>
val systemOverload = right == ACTIVATE && isOverloaded
if (systemOverload) {
logging.error(this, "system is overloaded")
Future.failed(RejectRequest(TooManyRequests, systemOverloaded))
} else Future.successful(())
}
}

/**
* Limits activations if subject exceeds their own limits.
* If the requested right is an activation, the set of resources must contain an activation of an action or filter to be throttled.
Expand Down
Loading

0 comments on commit 9dd34f2

Please sign in to comment.