Skip to content

Commit

Permalink
Go to the NamespaceThrottled state rather than Flushing state. (#5303)
Browse files Browse the repository at this point in the history
* Currently MemoryQueue will go to Flushing state when receive a EnableNamespaceThrottling(dropMsg=true) message, but the Flushing state doesn't have a case to disable namespace throttling at all.

* Remove unused import.
  • Loading branch information
style95 authored Aug 3, 2022
1 parent cf127f9 commit 8fd2156
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.openwhisk.core.scheduler.queue

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Status.{Failure => FailureMessage}
import akka.actor.{ActorRef, ActorSystem, Cancellable, FSM, Props, Stash}
import akka.util.Timeout
import org.apache.openwhisk.common._
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.ack.ActiveAck
import org.apache.openwhisk.core.connector.ContainerCreationError.{TooManyConcurrentRequests, ZeroNamespaceLimit}
import org.apache.openwhisk.core.connector.ContainerCreationError.ZeroNamespaceLimit
import org.apache.openwhisk.core.connector._
import org.apache.openwhisk.core.containerpool.Interval
import org.apache.openwhisk.core.database.{NoDocumentException, UserContext}
Expand All @@ -44,10 +42,12 @@ import org.apache.openwhisk.core.scheduler.message.{
import org.apache.openwhisk.core.scheduler.{SchedulerEndpoints, SchedulingConfig}
import org.apache.openwhisk.core.service._
import org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcurrentRequests}
import pureconfig.generic.auto._
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._

import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
Expand Down Expand Up @@ -224,18 +224,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
logging.info(this, s"[$invocationNamespace:$action:$stateName] Enable namespace throttling.")
enableNamespaceThrottling()

// if no container could be created, it is same with Flushing state.
if (dropMsg) {
if (dropMsg)
completeAllActivations(tooManyConcurrentRequests, isWhiskError = false)
goto(Flushing) using FlushingData(
data.schedulerActor,
data.droppingActor,
TooManyConcurrentRequests,
tooManyConcurrentRequests)
} else {
// if there are already some containers running, activations can still be processed so goto the NamespaceThrottled state.
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)
}
goto(NamespaceThrottled) using ThrottledData(data.schedulerActor, data.droppingActor)

case Event(StateTimeout, data: RunningData) =>
if (queue.isEmpty && (containers.size + creationIds.size) <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class MemoryQueueFlowTests
probe.expectTerminated(fsm, 10.seconds)
}

it should "go to the Flushing state dropping messages when it can't create an initial container" in {
it should "go to the NamespaceThrottled state dropping messages when it can't create an initial container" in {
val mockEtcdClient = mock[EtcdClient]
val parent = TestProbe()
val watcher = TestProbe()
Expand Down Expand Up @@ -340,7 +340,7 @@ class MemoryQueueFlowTests
fsm ! message

dataMgmtService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, failoverEnabled = false))
probe.expectMsg(Transition(fsm, Running, Flushing))
probe.expectMsg(Transition(fsm, Running, NamespaceThrottled))

awaitAssert({
ackedMessageCount shouldBe 1
Expand All @@ -352,13 +352,17 @@ class MemoryQueueFlowTests
fsm.underlyingActor.queue.size shouldBe 0
}, 5.seconds)

parent.expectMsg(flushGrace * 2 + 5.seconds, queueRemovedMsg)
probe.expectMsg(Transition(fsm, Flushing, Removed))
fsm ! GracefulShutdown

parent.expectMsg(queueRemovedMsg)
probe.expectMsg(Transition(fsm, NamespaceThrottled, Removing))

fsm ! QueueRemovedCompleted

expectDataCleanUp(watcher, dataMgmtService)

probe.expectMsg(Transition(fsm, Removing, Removed))

probe.expectTerminated(fsm, 10.seconds)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,7 @@ class MemoryQueueTests
parent.expectMsg(10 seconds, Transition(fsm, Uninitialized, Running))

fsm ! EnableNamespaceThrottling(dropMsg = true)
parent.expectMsg(10 seconds, Transition(fsm, Running, Flushing))
parent.expectMsg(10 seconds, Transition(fsm, Running, NamespaceThrottled))
dataManagementService.expectMsg(RegisterData(namespaceThrottlingKey, true.toString, false))

fsm.stop()
Expand Down

0 comments on commit 8fd2156

Please sign in to comment.