Skip to content

Commit

Permalink
make scheduler consider action concurrency >1 (apache#5378)
Browse files Browse the repository at this point in the history
Co-authored-by: Brendan Doyle <brendand@qualtrics.com>
(cherry picked from commit 415ae98)
  • Loading branch information
bdoyle0182 authored and mtt-merz committed Oct 22, 2023
1 parent 9d96c6d commit 851ead0
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
namespaceContainerCount.inProgressContainerNumByNamespace,
averageDuration,
limit,
actionMetaData.limits.concurrency.maxConcurrent,
stateName,
self)
case Failure(_: NoDocumentException) =>
Expand Down Expand Up @@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean,
inProgressContainerCountInNamespace: Int,
averageDuration: Option[Double],
limit: Int,
maxActionConcurrency: Int,
stateName: MemoryQueueState,
recipient: ActorRef)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SchedulingDecisionMaker(
inProgressContainerCountInNs,
averageDuration,
limit,
maxActionConcurrency,
stateName,
_) = snapshot
val totalContainers = existing + inProgress
Expand Down Expand Up @@ -137,7 +138,7 @@ class SchedulingDecisionMaker(
// but it is a kind of trade-off and we place latency on top of over-provisioning
case (Running, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = staleActivationNum - inProgress
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
Expand All @@ -153,7 +154,7 @@ class SchedulingDecisionMaker(
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg - staleActivationNum

Expand Down Expand Up @@ -201,7 +202,7 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
Expand All @@ -219,7 +220,7 @@ class SchedulingDecisionMaker(
// same with the above case but no duration exist.
case (Removing, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = staleActivationNum - inProgress
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class MemoryQueueFlowTests
// this is the case where there is no capacity in a namespace and no container can be created.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) =>
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) =>
sender ! DecisionResults(EnableNamespaceThrottling(true), 0)
TestActor.KeepRunning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,7 @@ class MemoryQueueTests
// This test pilot mimic the decision maker who disable the namespace throttling when there is enough capacity.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
sender ! DisableNamespaceThrottling

case _ =>
Expand Down
Loading

0 comments on commit 851ead0

Please sign in to comment.