Skip to content

Commit

Permalink
Fix Orphaned Container Edge Case In Paused State of Container Proxy (#…
Browse files Browse the repository at this point in the history
…5326)

* fix orphaned container edge case in proxy paused state

* enhance test

* feedback

Co-authored-by: Brendan Doyle <brendand@qualtrics.com>
  • Loading branch information
bdoyle0182 and Brendan Doyle authored Sep 23, 2022
1 parent a1639f0 commit 625c5f2
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class ShardingContainerPoolBalancer(
AkkaManagement(actorSystem).start()
ClusterBootstrap(actorSystem).start()
Some(Cluster(actorSystem))
} else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
} else if (loadConfigOrThrow[Seq[String]]("akka.cluster.seed-nodes").nonEmpty) {
Some(Cluster(actorSystem))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ case class Initialized(data: InitializedData)
case class Resumed(data: WarmData)
case class ResumeFailed(data: WarmData)
case class RecreateClient(action: ExecutableWhiskAction)
case class DetermineKeepContainer(attempt: Int)

// States
sealed trait ProxyState
Expand Down Expand Up @@ -661,6 +662,7 @@ class FunctionPullingContainerProxy(
val parent = context.parent
cancelTimer(IdleTimeoutName)
cancelTimer(KeepingTimeoutName)
cancelTimer(DetermineKeepContainer.toString)
data.container
.resume()
.map { _ =>
Expand Down Expand Up @@ -693,32 +695,43 @@ class FunctionPullingContainerProxy(
instance,
data.container.containerId))
goto(Running)

case Event(StateTimeout, data: WarmData) =>
(for {
count <- getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
(warmedContainerKeepingCount, warmedContainerKeepingTimeout) <- getWarmedContainerLimit(
data.invocationNamespace)
} yield {
logging.info(
this,
s"Live container count: ${count}, warmed container keeping count configuration: ${warmedContainerKeepingCount} in namespace: ${data.invocationNamespace}")
if (count <= warmedContainerKeepingCount) {
Keep(warmedContainerKeepingTimeout)
} else {
Remove
}
}).pipeTo(self)
case Event(StateTimeout, _: WarmData) =>
self ! DetermineKeepContainer(0)
stay
case Event(DetermineKeepContainer(attempt), data: WarmData) =>
getLiveContainerCount(data.invocationNamespace, data.action.fullyQualifiedName(false), data.revision)
.flatMap(count => {
getWarmedContainerLimit(data.invocationNamespace).map(warmedContainerInfo => {
logging.info(
this,
s"Live container count: $count, warmed container keeping count configuration: ${warmedContainerInfo._1} in namespace: ${data.invocationNamespace}")
if (count <= warmedContainerInfo._1) {
self ! Keep(warmedContainerInfo._2)
} else {
self ! Remove
}
})
})
.recover({
case t: Throwable =>
logging.error(
this,
s"Failed to determine whether to keep or remove container on pause timeout for ${data.container.containerId}, retrying. Caused by: $t")
if (attempt < 5) {
startSingleTimer(DetermineKeepContainer.toString, DetermineKeepContainer(attempt + 1), 500.milli)
} else {
self ! Remove
}
})
stay

case Event(Keep(warmedContainerKeepingTimeout), data: WarmData) =>
logging.info(
this,
s"This is the remaining container for ${data.action}. The container will stop after $warmedContainerKeepingTimeout.")
startSingleTimer(KeepingTimeoutName, Remove, warmedContainerKeepingTimeout)
stay

case Event(Remove | GracefulShutdown, data: WarmData) =>
cancelTimer(DetermineKeepContainer.toString)
dataManagementService ! UnregisterData(
ContainerKeys.warmedContainers(
data.invocationNamespace,
Expand All @@ -732,7 +745,6 @@ class FunctionPullingContainerProxy(
data.action.fullyQualifiedName(false),
data.action.rev,
Some(data.clientProxy))

case _ => delay
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,22 @@ class FunctionPullingContainerProxyTests
Future.successful(count)
}

def getLiveContainerCountFail(count: Long) = LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
Future.failed(new Exception("failure"))
}

def getLiveContainerCountFailFirstCall(count: Long) = {
var firstCall = true
LoggedFunction { (_: String, _: FullyQualifiedEntityName, _: DocRevision) =>
if (firstCall) {
firstCall = false
Future.failed(new Exception("failure"))
} else {
Future.successful(count)
}
}
}

def getWarmedContainerLimit(limit: Future[(Int, FiniteDuration)]) = LoggedFunction { (_: String) =>
limit
}
Expand Down Expand Up @@ -1036,6 +1052,176 @@ class FunctionPullingContainerProxyTests
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails once" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCountFailFirstCall(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))

registerCallback(machine, probe)
probe watch machine

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)

probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
//register running container
dataManagementService.expectMsgType[RegisterData]

client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}

machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
//register paused warmed container
dataManagementService.expectMsgType[RegisterData]

machine ! StateTimeout
client.expectMsg(StopClientProxy)
dataManagementService.expectMsgType[UnregisterData]
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
client.send(machine, ClientClosed)

probe expectTerminated machine

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}

it should "destroy container proxy when stopping due to timeout and getting live count fails permanently" in within(timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
val namespaceBlacklist: NamespaceBlacklist = new NamespaceBlacklist(authStore)
val get = getWhiskAction(Future(action.toWhiskAction))
val dataManagementService = TestProbe()
val container = new TestContainer
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()
val counter = getLiveContainerCountFail(2)
val limit = getWarmedContainerLimit(Future.successful((1, 10.seconds)))
val (client, clientFactory) = testClient

val probe = TestProbe()
val machine =
probe.childActorOf(
FunctionPullingContainerProxy
.props(
factory,
entityStore,
namespaceBlacklist,
get,
dataManagementService.ref,
clientFactory,
acker,
store,
collector,
counter,
limit,
InvokerInstanceId(0, userMemory = defaultUserMemory),
invokerHealthManager.ref,
poolConfig,
timeoutConfig))

registerCallback(machine, probe)
probe watch machine

machine ! Initialize(invocationNamespace.asString, fqn, action, schedulerHost, rpcPort, messageTransId)

probe.expectMsg(Transition(machine, Uninitialized, CreatingClient))
client.expectMsg(StartClient)
client.send(machine, ClientCreationCompleted())

probe.expectMsg(Transition(machine, CreatingClient, ClientCreated))
expectInitialized(probe)
//register running container
dataManagementService.expectMsgType[RegisterData]

client.expectMsg(RequestActivation())
client.send(machine, message)

probe.expectMsg(Transition(machine, ClientCreated, Running))
client.expectMsg(ContainerWarmed)
client.expectMsgPF() {
case RequestActivation(Some(_), None) => true
}

machine ! StateTimeout
client.send(machine, RetryRequestActivation)
probe.expectMsg(Transition(machine, Running, Pausing))
probe.expectMsgType[ContainerIsPaused]
probe.expectMsg(Transition(machine, Pausing, Paused))
//register paused warmed container
dataManagementService.expectMsgType[RegisterData]

machine ! StateTimeout
client.expectMsg(StopClientProxy)
dataManagementService.expectMsgType[UnregisterData]
probe.expectMsgAllOf(ContainerRemoved(true), Transition(machine, Paused, Removing))
client.send(machine, ClientClosed)

probe expectTerminated machine

awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 1
collector.calls.length shouldBe 1
container.destroyCount shouldBe 1
acker.calls.length shouldBe 1
store.calls.length shouldBe 1
}
}

it should "destroy container proxy even if there is no message from the client when stopping due to timeout" in within(
timeout) {
val authStore = mock[ArtifactWhiskAuthStore]
Expand Down

0 comments on commit 625c5f2

Please sign in to comment.