Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make scheduler consider action concurrency >1 #5378

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
namespaceContainerCount.inProgressContainerNumByNamespace,
averageDuration,
limit,
actionMetaData.limits.concurrency.maxConcurrent,
stateName,
self)
case Failure(_: NoDocumentException) =>
Expand Down Expand Up @@ -1222,6 +1223,7 @@ case class QueueSnapshot(initialized: Boolean,
inProgressContainerCountInNamespace: Int,
averageDuration: Option[Double],
limit: Int,
maxActionConcurrency: Int,
stateName: MemoryQueueState,
recipient: ActorRef)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SchedulingDecisionMaker(
inProgressContainerCountInNs,
averageDuration,
limit,
maxActionConcurrency,
stateName,
_) = snapshot
val totalContainers = existing + inProgress
Expand Down Expand Up @@ -137,7 +138,7 @@ class SchedulingDecisionMaker(
// but it is a kind of trade-off and we place latency on top of over-provisioning
case (Running, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = staleActivationNum - inProgress
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
Expand All @@ -153,7 +154,7 @@ class SchedulingDecisionMaker(
// need more containers and a message is already processed
case (Running, Some(duration)) =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
val expectedTps = containerThroughput * (existing + inProgress)
val availableNonStaleActivations = availableMsg - staleActivationNum

Expand Down Expand Up @@ -201,7 +202,7 @@ class SchedulingDecisionMaker(
// this case is for that as a last resort.
case (Removing, Some(duration)) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val containerThroughput = staleThreshold / duration
val containerThroughput = (staleThreshold / duration) * maxActionConcurrency
val num = ceiling(staleActivationNum.toDouble / containerThroughput)
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = (if (num > staleActivationNum) staleActivationNum else num) - inProgress
Expand All @@ -219,7 +220,7 @@ class SchedulingDecisionMaker(
// same with the above case but no duration exist.
case (Removing, None) if staleActivationNum > 0 =>
// we can safely get the value as we already checked the existence
val num = staleActivationNum - inProgress
val num = ceiling(staleActivationNum.toDouble / maxActionConcurrency.toDouble) - inProgress
// if it tries to create more containers than existing messages, we just create shortage
val actualNum = if (num > availableMsg) availableMsg else num
addServersIfPossible(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class MemoryQueueFlowTests
// this is the case where there is no capacity in a namespace and no container can be created.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, Running, _) =>
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, Running, _) =>
sender ! DecisionResults(EnableNamespaceThrottling(true), 0)
TestActor.KeepRunning

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1534,7 +1534,7 @@ class MemoryQueueTests
// This test pilot mimic the decision maker who disable the namespace throttling when there is enough capacity.
decisionMaker.setAutoPilot((sender: ActorRef, msg) => {
msg match {
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
case QueueSnapshot(_, _, _, _, _, _, _, _, _, _, _, NamespaceThrottled, _) =>
sender ! DisableNamespaceThrottling

case _ =>
Expand Down
Loading