Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scheduler overprovision for new actions before namespace throttling #5284

Merged
2 changes: 2 additions & 0 deletions core/scheduler/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ whisk {
stale-threshold = "100 milliseconds"
check-interval = "100 milliseconds"
drop-interval = "10 seconds"
allow-over-provision-before-throttle = false
namespace-over-provision-before-throttle-ratio = 1.5
}
queue {
idle-grace = "20 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,8 @@ object SchedulerStates extends DefaultJsonProtocol {
def parse(states: String) = Try(serdes.read(states.parseJson))
}

case class SchedulingConfig(staleThreshold: FiniteDuration, checkInterval: FiniteDuration, dropInterval: FiniteDuration)
case class SchedulingConfig(staleThreshold: FiniteDuration,
checkInterval: FiniteDuration,
dropInterval: FiniteDuration,
allowOverProvisionBeforeThrottle: Boolean,
namespaceOverProvisionBeforeThrottleRatio: Double)
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,17 @@ class SchedulingDecisionMaker(
case _ => Future.successful(DecisionResults(Pausing, 0))
}
} else {
val capacity = limit - existingContainerCountInNs - inProgressContainerCountInNs
val capacity = if (schedulingConfig.allowOverProvisionBeforeThrottle && totalContainers == 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if feature is turned on and the action has no containers yet, give it a capacity of 1 on first decision iteration if total containers in use for the namespace are less than the limit * over provision ratio value, thereafter do the normal capacity calculation.

If this is still over the limit, give 0 capacity and follow the normal code path to turn on namespace throttling

// if space available within the over provision ratio amount above namespace limit, create one container for new
// action so namespace traffic can attempt to re-balance without blocking entire action
if ((ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs) > 0) {
1
} else {
0
}
} else {
limit - existingContainerCountInNs - inProgressContainerCountInNs
}
if (capacity <= 0) {
stateName match {

Expand All @@ -79,12 +89,15 @@ class SchedulingDecisionMaker(
*
* However, if the container exists(totalContainers != 0), the activation is not treated as a failure and the activation is delivered to the container.
*/
case Running =>
case Running
if !schedulingConfig.allowOverProvisionBeforeThrottle || (schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(
limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs <= 0) =>
logging.info(
this,
s"there is no capacity activations will be dropped or throttled, (availableMsg: $availableMsg totalContainers: $totalContainers, limit: $limit, namespaceContainers: ${existingContainerCountInNs}, namespaceInProgressContainer: ${inProgressContainerCountInNs}) [$invocationNamespace:$action]")
Future.successful(DecisionResults(EnableNamespaceThrottling(dropMsg = totalContainers == 0), 0))

case NamespaceThrottled if schedulingConfig.allowOverProvisionBeforeThrottle && ceiling(limit * schedulingConfig.namespaceOverProvisionBeforeThrottleRatio) - existingContainerCountInNs - inProgressContainerCountInNs > 0 =>
bdoyle0182 marked this conversation as resolved.
Show resolved Hide resolved
Future.successful(DecisionResults(DisableNamespaceThrottling, 0))
// do nothing
case _ =>
// no need to print any messages if the state is already NamespaceThrottled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MemoryQueueTestsFixture
val testNamespace = "test-namespace"
val testAction = "test-action"

val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)

val fqn = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))
val revision = DocRevision("1-testRev")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SchedulingDecisionMakerTests
val testAction = "test-action"
val action = FullyQualifiedEntityName(EntityPath(testNamespace), EntityName(testAction), Some(SemVer(0, 0, 1)))

val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds)
val schedulingConfig = SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, false, 1.5)

it should "decide pausing when the limit is less than equal to 0" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
Expand Down Expand Up @@ -149,7 +149,7 @@ class SchedulingDecisionMakerTests
}
}

it should "enable namespace throttling with dropping msg when there is not enough capacity and no container" in {
it should "enable namespace throttling with dropping msg when there is not enough capacity, no container, and namespace over-provision disabled" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -173,7 +173,7 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
}

it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers" in {
it should "enable namespace throttling without dropping msg when there is not enough capacity but are some containers and namespace over-provision disabled" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -197,7 +197,142 @@ class SchedulingDecisionMakerTests
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
}

it should "add an initial container if there is no any" in {
it should "add one container when there is no container, and namespace over-provision has capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = false,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 0, // there is no container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}

it should "enable namespace throttling with dropping msg when there is no container, and namespace over-provision has no capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 0, // there is no container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = true), 0))
}

it should "disable namespace throttling when namespace over-provision has capacity again" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.1)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1, // there is one container for this action
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = NamespaceThrottled,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an initial container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(DisableNamespaceThrottling, 0))
}

it should "enable namespace throttling without dropping msg when there is a container, and namespace over-provision has no additional capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.0)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1,
inProgressContainerCount = 0,
staleActivationNum = 0,
existingContainerCountInNamespace = 1, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 1,
averageDuration = None,
limit = 2,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create an additional container so enable throttling and drop messages.
testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(dropMsg = false), 0))
}

it should "not enable namespace throttling when there is not enough capacity but are some containers and namespace over-provision is enabled with capacity" in {
val schedulingConfigNamespaceOverProvisioning =
SchedulingConfig(100.milliseconds, 100.milliseconds, 10.seconds, true, 1.5)
val decisionMaker =
system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfigNamespaceOverProvisioning))
val testProbe = TestProbe()

val msg = QueueSnapshot(
initialized = true,
incomingMsgCount = new AtomicInteger(0),
currentMsgCount = 0,
existingContainerCount = 1, // there are some containers for this action
inProgressContainerCount = 1,
staleActivationNum = 0,
existingContainerCountInNamespace = 2, // but there are already 2 containers in this namespace
inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well.
averageDuration = None,
limit = 4,
stateName = Running,
recipient = testProbe.ref)

decisionMaker ! msg

// this queue cannot create more containers
testProbe.expectNoMessage()
}

it should "add an initial container if there is not any" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()

Expand All @@ -219,6 +354,7 @@ class SchedulingDecisionMakerTests

testProbe.expectMsg(DecisionResults(AddInitialContainer, 1))
}

it should "disable the namespace throttling with adding an initial container when there is no container" in {
val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig))
val testProbe = TestProbe()
Expand Down