From 3f65d78c8f3bd30a8801699667d0d13fe443ffd4 Mon Sep 17 00:00:00 2001 From: Rodric Rabbah Date: Sun, 24 Jan 2021 10:58:35 -0500 Subject: [PATCH] Fixes bug in invoker supervision on startup. --- .../loadBalancer/InvokerSupervision.scala | 83 ++++++++++++------- .../core/containerpool/ContainerPool.scala | 8 +- .../test/InvokerSupervisionTests.scala | 51 +++++++++++- 3 files changed, 108 insertions(+), 34 deletions(-) diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala index 5a9d36707db..d61553a01b0 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/InvokerSupervision.scala @@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr val healthyTimeout: FiniteDuration = 10.seconds - // This is done at this point to not intermingle with the state-machine - // especially their timeouts. + // This is done at this point to not intermingle with the state-machine especially their timeouts. def customReceive: Receive = { - case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them. + case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer. } - override def receive: Receive = customReceive.orElse(super.receive) - /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */ - startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize))) - - /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */ - when(Offline) { - case Event(_: PingMessage, _) => goto(Unhealthy) - } + override def receive: Receive = customReceive.orElse(super.receive) // To be used for all states that should send test actions to reverify the invoker val healthPingingState: StateFunction = { @@ -317,6 +309,22 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr stay } + // To be used for all states that should send test actions to reverify the invoker + def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = { + case _ -> `state` => + invokeTestAction() + setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true) + case `state` -> _ => cancelTimer(InvokerActor.timerName) + } + + /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */ + startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize))) + + /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */ + when(Offline) { + case Event(_: PingMessage, _) => goto(Unhealthy) + } + /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */ when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState) @@ -324,20 +332,20 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState) /** - * A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed - * for 20 seconds. + * A Healthy invoker is characterized by continuously getting pings. + * It will go offline if that state is not confirmed for 20 seconds. */ when(Healthy, stateTimeout = healthyTimeout) { case Event(_: PingMessage, _) => stay case Event(StateTimeout, _) => goto(Offline) } - /** Handle the completion of an Activation in every state. */ + /** Handles the completion of an Activation in every state. */ whenUnhandled { case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer) } - /** Logging on Transition change */ + /** Logs transition changes. */ onTransition { case _ -> newState if !newState.isUsable => transid.mark( @@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}") } - // To be used for all states that should send test actions to reverify the invoker - def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = { - case _ -> `state` => - invokeTestAction() - setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true) - case `state` -> _ => cancelTimer(InvokerActor.timerName) - } - onTransition(healthPingingTransitionHandler(Unhealthy)) onTransition(healthPingingTransitionHandler(Unresponsive)) @@ -372,8 +372,8 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr buffer: RingBuffer[InvocationFinishedResult]) = { buffer.add(result) - // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately - // a new test action to remove the errors out of the RingBuffer as fast as possible. + // If the action is successful, the Invoker is Healthy. We execute additional test actions + // immediately to clear the RingBuffer as fast as possible. // The actions that arrive while the invoker is unhealthy are most likely health actions. // It is possible they are normal user actions as well. This can happen if such actions were in the // invoker queue or in progress while the invoker's status flipped to Unhealthy. @@ -381,19 +381,44 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr invokeTestAction() } - // Stay in online if the activations was successful. - // Stay in offline, if an activeAck reaches the controller. + // Stay online if the activations was successful. + // Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging. if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) { stay } else { val entries = buffer.toList - // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy + + // Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state. + // Otherwise transition to Healthy on successful activations only. if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) { + // Note: The predicate is false if the ring buffer is still being primed + // (i.e., the entries.size <= bufferErrorTolerance). gotoIfNotThere(Unhealthy) } else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) { + // Note: The predicate is false if the ring buffer is still being primed + // (i.e., the entries.size <= bufferErrorTolerance). gotoIfNotThere(Unresponsive) } else { - gotoIfNotThere(Healthy) + result match { + case InvocationFinishedResult.Success => + // Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or + // during priming of the ring buffer. In case of the latter, there is at least one additional test + // action in flight which can reverse the transition later. + gotoIfNotThere(Healthy) + + case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) => + // The ring buffer is not fully primed yet, stay/goto Unhealthy + gotoIfNotThere(Unhealthy) + + case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) => + // The ring buffer is not fully primed yet, stay/goto Unresponsive + gotoIfNotThere(Unresponsive) + + case _ => + // at steady state, the state of the buffer superceded and we hold the current state + // until enough events have occured to transition to a new state + stay + } } } } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 4327ad160fb..fe0e3a2ed6c 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -267,19 +267,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, freePool.get(sender()).foreach { f => freePool = freePool - sender() } + // container was busy (busy indicates at full capacity), so there is capacity to accept another job request busyPool.get(sender()).foreach { _ => busyPool = busyPool - sender() } processBufferOrFeed() - //in case this was a prewarm + // in case this was a prewarm prewarmedPool.get(sender()).foreach { data => prewarmedPool = prewarmedPool - sender() } - //in case this was a starting prewarm + + // in case this was a starting prewarm prewarmStartingPool.get(sender()).foreach { _ => - logging.info(this, "failed starting prewarm removed") + logging.info(this, "failed starting prewarm, removed") prewarmStartingPool = prewarmStartingPool - sender() } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala index 1cf73834b82..7e0d88f49a8 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -207,6 +207,54 @@ class InvokerSupervisionTests behavior of "InvokerActor" + it should "start and stay unhealthy while min threshold is not met" in { + val invoker = + TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) + invoker.stateName shouldBe Unhealthy + + (1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ => + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.SystemError) + invoker.stateName shouldBe Unhealthy + } + + (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ => + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.Success) + invoker.stateName shouldBe Unhealthy + } + + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.Success) + invoker.stateName shouldBe Healthy + } + + it should "revert to unhealthy during initial startup if there is a failed test activation" in { + assume(InvokerActor.bufferErrorTolerance >= 3) + + val invoker = + TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) + invoker.stateName shouldBe Unhealthy + + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.SystemError) + invoker.stateName shouldBe Unhealthy + + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.Success) + invoker.stateName shouldBe Healthy + + invoker ! InvocationFinishedMessage( + InvokerInstanceId(0, userMemory = defaultUserMemory), + InvocationFinishedResult.SystemError) + invoker.stateName shouldBe Unhealthy + } + // unHealthy -> offline // offline -> unhealthy it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in { @@ -318,7 +366,7 @@ class InvokerSupervisionTests } } - it should "start timer to send testactions when unhealthy" in { + it should "start timer to send test actions when unhealthy" in { val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0"))) invoker.stateName shouldBe Unhealthy @@ -337,7 +385,6 @@ class InvokerSupervisionTests } it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in { - val invoker0 = TestProbe() val children = mutable.Queue(invoker0.ref) val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()