Skip to content

Commit

Permalink
Add scheduler overprovision for new actions before namespace throttli…
Browse files Browse the repository at this point in the history
…ng (apache#5284)

* initial attempt

* tests

* fix tests

* enable throttling when last capacity used in overprovisioning

* add case to correctly disable namespace throttling when namespace overprovisioning has space

* feedback

Co-authored-by: Brendan Doyle <brendand@qualtrics.com>
  • Loading branch information
2 people authored and michele-sciabarra committed Nov 23, 2022
1 parent 6739041 commit fd0e1b9
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 9 deletions.
2 changes: 2 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ whisk {
stale-threshold = "100 milliseconds"
check-interval = "100 milliseconds"
drop-interval = "10 seconds"
allow-over-provision-before-throttle = false
namespace-over-provision-before-throttle-ratio = 1.5
}
queue {
idle-grace = "20 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol {
def parse(states: String) = Try(serdes.read(states.parseJson))
}

case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
case class SchedulingConfig(staleThreshold: FiniteDuration,
checkInterval: FiniteDuration,
dropInterval: FiniteDuration,
allowOverProvisionBeforeThrottle: Boolean,
namespaceOverProvisionBeforeThrottleRatio: Double)
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,17 @@ class SchedulingDecisionMaker(
case _ => Future.successful(DecisionResults(Pausing, 0))
}
} else {
val capacity = limit - existingContainerCountInNs - inProgressContainerCountInNs
val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) {
// if space available within the over provision ratio amount above namespace limit, create one container for new
// action so namespace traffic can attempt to re-balance without blocking entire action
if ((ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs) > 0) {
1
} else {
0
}
} else {
limit - existingContainerCountInNs - inProgressContainerCountInNs
}
if (capacity <= 0) {
stateName match {

Expand All @@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
*
* However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container.
*/
case Running =>
case Running
if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
logging.info(
this,
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))

case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
// no need to print any messages if the state is already NamespaceThrottled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MemoryQueueTestsFixture
val testNamespace = "test-namespace"
val testAction = "test-action"

val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)

val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
val revision = DocRevision("1-testRev")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
val testAction = "test-action"
val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))

val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)

it should "decide pausing when the limit is less than equal to 0" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
Expand Down Expand Up @@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
}
}

it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
it should "enable namespace throttling with dropping msg when there is not enough capacity, no container, and namespace over-provision disabled" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
}

it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers and namespace over-provision disabled" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
}

it should "add an initial container if there is no any" in {
it should "add one container when there is no container, and namespace over-provision has capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = false,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 0, // there is no container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}

it should "enable namespace throttling with dropping msg when there is no container, and namespace over-provision has no capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 0, // there is no container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
}

it should "disable namespace throttling when namespace over-provision has capacity again" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.1)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1, // there is one container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = NamespaceThrottled,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
}

it should "enable namespace throttling without dropping msg when there is a container, and namespace over-provision has no additional capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1,
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an additional container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
}

it should "not enable namespace throttling when there is not enough capacity but are some containers and namespace over-provision is enabled with capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1, // there are some containers for this action
inProgressContainerCount = 1,
staleActivationNum = 0,
existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well.
averageDuration = None,
limit = 4,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create more containers
testProbe.expectNoMessage()
}

it should "add an initial container if there is not any" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests

testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}

it should "disable the namespace throttling with adding an initial container when there is no container" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
Expand Down

0 comments on commit fd0e1b9

Please sign in to comment.