From 738a0ef9c2582e33a8be168e75c11d76c87152dc Mon Sep 17 00:00:00 2001 From: Brendan Doyle Date: Thu, 9 Feb 2023 16:19:00 -0800 Subject: [PATCH] make scheduler consider action concurrency >1 --- .../core/scheduler/queue/MemoryQueue.scala | 2 + .../queue/SchedulingDecisionMaker.scala | 9 +- .../queue/test/MemoryQueueFlowTests.scala | 2 +- .../queue/test/MemoryQueueTests.scala | 2 +- .../test/SchedulingDecisionMakerTests.scala | 145 +++++++++++++++++- 5 files changed, 153 insertions(+), 7 deletions(-) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala index a5b80ffd1fc..051e289767e 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala @@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient, namespaceContainerCount.inProgressContainerNumByNamespace, averageDuration, limit, + actionMetaData.limits.concurrency.maxConcurrent, stateName, self) case Failure(_: NoDocumentException) => @@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean, inProgressContainerCountInNamespace: Int, averageDuration: Option[Double], limit: Int, + maxActionConcurrency: Int, stateName: MemoryQueueState, recipient: ActorRef) diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala index d5dca8bb761..ab0de2f6af4 100644 --- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala +++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/SchedulingDecisionMaker.scala @@ -58,6 +58,7 @@ class SchedulingDecisionMaker( inProgressContainerCountInNs, averageDuration, limit, + maxActionConcurrency, stateName, _) = snapshot val totalContainers = existing + inProgress @@ -137,7 +138,7 @@ class SchedulingDecisionMaker( // but it is a kind of trade-off and we place latency on top of over-provisioning case (Running, None) if staleActivationNum > 0 => // we can safely get the value as we already checked the existence - val num = staleActivationNum - inProgress + val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress // if it tries to create more containers than existing messages, we just create shortage val actualNum = if (num > availableMsg) availableMsg else num addServersIfPossible( @@ -153,7 +154,7 @@ class SchedulingDecisionMaker( // need more containers and a message is already processed case (Running, Some(duration)) => // we can safely get the value as we already checked the existence - val containerThroughput = staleThreshold / duration + val containerThroughput = (staleThreshold / duration) * maxActionConcurrency val expectedTps = containerThroughput * (existing + inProgress) val availableNonStaleActivations = availableMsg - staleActivationNum @@ -201,7 +202,7 @@ class SchedulingDecisionMaker( // this case is for that as a last resort. case (Removing, Some(duration)) if staleActivationNum > 0 => // we can safely get the value as we already checked the existence - val containerThroughput = staleThreshold / duration + val containerThroughput = (staleThreshold / duration) * maxActionConcurrency val num = ceiling(staleActivationNum.toDouble / containerThroughput) // if it tries to create more containers than existing messages, we just create shortage val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress @@ -219,7 +220,7 @@ class SchedulingDecisionMaker( // same with the above case but no duration exist. case (Removing, None) if staleActivationNum > 0 => // we can safely get the value as we already checked the existence - val num = staleActivationNum - inProgress + val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress // if it tries to create more containers than existing messages, we just create shortage val actualNum = if (num > availableMsg) availableMsg else num addServersIfPossible( diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala index a3cd73e1ac1..9a235a99094 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueFlowTests.scala @@ -313,7 +313,7 @@ class MemoryQueueFlowTests // this is the case where there is no capacity in a namespace and no container can be created. decisionMaker.setAutoPilot((sender: ActorRef, msg) => { msg match { - case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) => + case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) => sender ! DecisionResults(EnableNamespaceThrottling(true), 0) TestActor.KeepRunning diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala index 37191378cb6..c2c5ae34003 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/MemoryQueueTests.scala @@ -1534,7 +1534,7 @@ class MemoryQueueTests // This test pilot mimic the decision maker who disable the namespace throttling when there is enough capacity. decisionMaker.setAutoPilot((sender: ActorRef, msg) => { msg match { - case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) => + case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) => sender ! DisableNamespaceThrottling case _ => diff --git a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala index b70b715542a..edd0783aaa0 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/SchedulingDecisionMakerTests.scala @@ -63,7 +63,8 @@ class SchedulingDecisionMakerTests existingContainerCountInNamespace = 0, inProgressContainerCountInNamespace = 0, averageDuration = None, - limit = 0, // limit is 0 + limit = 0, // limit is 0, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -87,6 +88,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = None, limit = 0, // limit is 0 + maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -114,6 +116,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 2, averageDuration = None, // No average duration available limit = 10, + maxActionConcurrency = 1, stateName = state, recipient = testProbe.ref) @@ -140,6 +143,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 8, averageDuration = Some(1.0), // Some average duration available limit = 20, + maxActionConcurrency = 1, stateName = state, recipient = testProbe.ref) @@ -164,6 +168,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 2, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -188,6 +193,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well. averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -215,6 +221,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 2, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -242,6 +249,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 2, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -269,6 +277,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 2, + maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -296,6 +305,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 2, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -323,6 +333,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 2, // this value includes the count of this action as well. averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -347,6 +358,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -370,6 +382,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -393,6 +406,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = NamespaceThrottled, recipient = testProbe.ref) @@ -417,6 +431,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -441,6 +456,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -465,6 +481,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -488,6 +505,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Flushing, recipient = testProbe.ref) @@ -511,6 +529,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -534,6 +553,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -557,6 +577,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = None, limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -580,6 +601,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 6, averageDuration = None, limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -603,6 +625,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(50), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -628,6 +651,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -653,6 +677,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -680,6 +705,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -706,6 +732,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 4, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -733,6 +760,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -759,6 +787,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = None, // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -782,6 +811,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 1, averageDuration = Some(1000), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -808,6 +838,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 2, averageDuration = None, // the average duration does not exist limit = 10, + maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -834,6 +865,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(1000), // the average duration exists limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -858,6 +890,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 0, averageDuration = Some(50), // the average duration gives container throughput of 2 limit = 10, + maxActionConcurrency = 1, stateName = Running, recipient = testProbe.ref) @@ -882,6 +915,7 @@ class SchedulingDecisionMakerTests inProgressContainerCountInNamespace = 2, averageDuration = None, // the average duration does not exist limit = 4, + maxActionConcurrency = 1, stateName = Removing, recipient = testProbe.ref) @@ -893,4 +927,113 @@ class SchedulingDecisionMakerTests // but there is not enough capacity, it becomes 1 testProbe.expectMsg(DecisionResults(EnableNamespaceThrottling(false), 1)) } + + it should "correctly calculate demand is met when action concurrency >1 w/ average duration and no stale activations" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 4, + existingContainerCount = 2, + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(100.0), + limit = 4, + maxActionConcurrency = 2, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // available messages is 4 with duration equaling the stale threshold and action concurrency of 2 so needed containers + // should be exactly 2 + testProbe.expectNoMessage() + } + + it should "add containers when action concurrency >1 w/ average duration and demand is not met" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 20, + existingContainerCount = 2, + inProgressContainerCount = 0, + staleActivationNum = 0, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(50.0), + limit = 10, + maxActionConcurrency = 3, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // available messages is 20 and throughput should be 100.0 / 50.0 * 3 = 6 + // existing container is 2 so can handle 12 messages, therefore need 2 more containers + testProbe.expectMsg(DecisionResults(AddContainer, 2)) + } + + it should "add containers when action concurrency >1 w/ average duration and demand is not met and has stale activations" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 30, + existingContainerCount = 2, + inProgressContainerCount = 0, + staleActivationNum = 10, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = Some(50.0), + limit = 10, + maxActionConcurrency = 3, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // available messages is 30 and throughput should be 100.0 / 50.0 * 3 = 6 + // existing container is 2 so can handle 12 messages, therefore need 2 more containers for non-stale + // stale has 10 activations so need another additional 2 + testProbe.expectMsg(DecisionResults(AddContainer, 4)) + } + + it should "add containers when action concurrency >1 when no average duration and there are stale activations" in { + val decisionMaker = system.actorOf(SchedulingDecisionMaker.props(testNamespace, action, schedulingConfig)) + val testProbe = TestProbe() + + // container + val msg = QueueSnapshot( + initialized = true, + incomingMsgCount = new AtomicInteger(0), + currentMsgCount = 10, + existingContainerCount = 1, + inProgressContainerCount = 0, + staleActivationNum = 10, + existingContainerCountInNamespace = 2, + inProgressContainerCountInNamespace = 0, + averageDuration = None, + limit = 10, + maxActionConcurrency = 3, + stateName = Running, + recipient = testProbe.ref) + + decisionMaker ! msg + + // stale messages are 10. want stale to be handled by first pass of requests from containers so + // 10 / 3 = 4.0 + testProbe.expectMsg(DecisionResults(AddContainer, 4)) + } }