Skip to content

Commit

Permalink
Fixes bug in invoker supervision on startup.
Browse files Browse the repository at this point in the history
  • Loading branch information
rabbah committed Jan 24, 2021
1 parent 4babe39 commit 3f65d78
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,12 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr

val healthyTimeout: FiniteDuration = 10.seconds

// This is done at this point to not intermingle with the state-machine
// especially their timeouts.
// This is done at this point to not intermingle with the state-machine especially their timeouts.
def customReceive: Receive = {
case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them.
case _: RecordMetadata => // Ignores the result of publishing test actions to MessageProducer.
}
override def receive: Receive = customReceive.orElse(super.receive)

/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))

/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
}
override def receive: Receive = customReceive.orElse(super.receive)

// To be used for all states that should send test actions to reverify the invoker
val healthPingingState: StateFunction = {
Expand All @@ -317,27 +309,43 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
stay
}

// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}

/** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */
startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize)))

/** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */
when(Offline) {
case Event(_: PingMessage, _) => goto(Unhealthy)
}

/** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */
when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState)

/** An Unresponsive invoker represents an invoker that is not responding with active acks in a timely manner */
when(Unresponsive, stateTimeout = healthyTimeout)(healthPingingState)

/**
* A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed
* for 20 seconds.
* A Healthy invoker is characterized by continuously getting pings.
* It will go offline if that state is not confirmed for 20 seconds.
*/
when(Healthy, stateTimeout = healthyTimeout) {
case Event(_: PingMessage, _) => stay
case Event(StateTimeout, _) => goto(Offline)
}

/** Handle the completion of an Activation in every state. */
/** Handles the completion of an Activation in every state. */
whenUnhandled {
case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer)
}

/** Logging on Transition change */
/** Logs transition changes. */
onTransition {
case _ -> newState if !newState.isUsable =>
transid.mark(
Expand All @@ -348,14 +356,6 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}")
}

// To be used for all states that should send test actions to reverify the invoker
def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = {
case _ -> `state` =>
invokeTestAction()
setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true)
case `state` -> _ => cancelTimer(InvokerActor.timerName)
}

onTransition(healthPingingTransitionHandler(Unhealthy))
onTransition(healthPingingTransitionHandler(Unresponsive))

Expand All @@ -372,28 +372,53 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr
buffer: RingBuffer[InvocationFinishedResult]) = {
buffer.add(result)

// If the action is successful it seems like the Invoker is Healthy again. So we execute immediately
// a new test action to remove the errors out of the RingBuffer as fast as possible.
// If the action is successful, the Invoker is Healthy. We execute additional test actions
// immediately to clear the RingBuffer as fast as possible.
// The actions that arrive while the invoker is unhealthy are most likely health actions.
// It is possible they are normal user actions as well. This can happen if such actions were in the
// invoker queue or in progress while the invoker's status flipped to Unhealthy.
if (result == InvocationFinishedResult.Success && stateName == Unhealthy) {
invokeTestAction()
}

// Stay in online if the activations was successful.
// Stay in offline, if an activeAck reaches the controller.
// Stay online if the activations was successful.
// Stay offline if an activeAck is received (a stale activation) but the invoker ceased pinging.
if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) {
stay
} else {
val entries = buffer.toList
// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer, else goto Healthy

// Goto Unhealthy or Unresponsive respectively if there are more errors than accepted in buffer at steady state.
// Otherwise transition to Healthy on successful activations only.
if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unhealthy)
} else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) {
// Note: The predicate is false if the ring buffer is still being primed
// (i.e., the entries.size <= bufferErrorTolerance).
gotoIfNotThere(Unresponsive)
} else {
gotoIfNotThere(Healthy)
result match {
case InvocationFinishedResult.Success =>
// Eagerly transition to healthy, at steady state (there aren't sufficient contra-indications) or
// during priming of the ring buffer. In case of the latter, there is at least one additional test
// action in flight which can reverse the transition later.
gotoIfNotThere(Healthy)

case InvocationFinishedResult.SystemError if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unhealthy
gotoIfNotThere(Unhealthy)

case InvocationFinishedResult.Timeout if (entries.size <= InvokerActor.bufferErrorTolerance) =>
// The ring buffer is not fully primed yet, stay/goto Unresponsive
gotoIfNotThere(Unresponsive)

case _ =>
// at steady state, the state of the buffer superceded and we hold the current state
// until enough events have occured to transition to a new state
stay
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,19 +267,21 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
freePool.get(sender()).foreach { f =>
freePool = freePool - sender()
}

// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
processBufferOrFeed()

//in case this was a prewarm
// in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
prewarmedPool = prewarmedPool - sender()
}
//in case this was a starting prewarm

// in case this was a starting prewarm
prewarmStartingPool.get(sender()).foreach { _ =>
logging.info(this, "failed starting prewarm removed")
logging.info(this, "failed starting prewarm, removed")
prewarmStartingPool = prewarmStartingPool - sender()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,54 @@ class InvokerSupervisionTests

behavior of "InvokerActor"

it should "start and stay unhealthy while min threshold is not met" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy

(1 to InvokerActor.bufferErrorTolerance + 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}

(1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance - 1).foreach { _ =>
invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Unhealthy
}

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy
}

it should "revert to unhealthy during initial startup if there is a failed test activation" in {
assume(InvokerActor.bufferErrorTolerance >= 3)

val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.Success)
invoker.stateName shouldBe Healthy

invoker ! InvocationFinishedMessage(
InvokerInstanceId(0, userMemory = defaultUserMemory),
InvocationFinishedResult.SystemError)
invoker.stateName shouldBe Unhealthy
}

// unHealthy -> offline
// offline -> unhealthy
it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in {
Expand Down Expand Up @@ -318,7 +366,7 @@ class InvokerSupervisionTests
}
}

it should "start timer to send testactions when unhealthy" in {
it should "start timer to send test actions when unhealthy" in {
val invoker =
TestFSMRef(new InvokerActor(InvokerInstanceId(0, userMemory = defaultUserMemory), ControllerInstanceId("0")))
invoker.stateName shouldBe Unhealthy
Expand All @@ -337,7 +385,6 @@ class InvokerSupervisionTests
}

it should "initially store invoker status with its full id - instance/uniqueName/displayedName" in {

val invoker0 = TestProbe()
val children = mutable.Queue(invoker0.ref)
val childFactory = (f: ActorRefFactory, instance: InvokerInstanceId) => children.dequeue()
Expand Down

0 comments on commit 3f65d78

Please sign in to comment.