Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove containers gradually when disable invoker #5253

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@
"CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}"
"CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}"
"CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}"
"CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize,
prewarmMaxRetryLimit: Int,
prewarmPromotion: Boolean,
memorySyncInterval: FiniteDuration,
batchDeletionSize: Int,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) {
require(
concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0,
s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor")

require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0")
require(batchDeletionSize > 0, "batch deletion size must be > 0")

/**
* The shareFactor indicates the number of containers that would share a single core, on average.
Expand Down
1 change: 1 addition & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ whisk {
prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers
prewarm-promotion: false # if true, action can take prewarm container which has bigger memory
memory-sync-interval: 1 second # period to sync memory info to etcd
batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload
}

kubernetes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class FunctionPullingContainerPool(
private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData]
private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]

// for shutting down
private var disablingPool = immutable.Set.empty[ActorRef]

private var shuttingDown = false

private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]()
Expand Down Expand Up @@ -353,18 +356,12 @@ class FunctionPullingContainerPool(

// Container got removed
case ContainerRemoved(replacePrewarm) =>
inProgressPool.get(sender()).foreach { _ =>
inProgressPool = inProgressPool - sender()
}

warmedPool.get(sender()).foreach { _ =>
warmedPool = warmedPool - sender()
}
inProgressPool = inProgressPool - sender()
warmedPool = warmedPool - sender()
disablingPool -= sender()

// container was busy (busy indicates at full capacity), so there is capacity to accept another job request
busyPool.get(sender()).foreach { _ =>
busyPool = busyPool - sender()
}
busyPool = busyPool - sender()

//in case this was a prewarm
prewarmedPool.get(sender()).foreach { data =>
Expand Down Expand Up @@ -601,11 +598,26 @@ class FunctionPullingContainerPool(
* Make all busyPool's memoryQueue actor shutdown gracefully
*/
private def waitForPoolToClear(): Unit = {
busyPool.keys.foreach(_ ! GracefulShutdown)
warmedPool.keys.foreach(_ ! GracefulShutdown)
if (inProgressPool.nonEmpty) {
val pool = self
// how many busy containers will be removed in this term
val slotsForBusyPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
(busyPool.keySet &~ disablingPool)
.take(slotsForBusyPool)
.foreach(container => {
disablingPool += container
container ! GracefulShutdown
})
// how many warm containers will be removed in this term
val slotsForWarmPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0)
(warmedPool.keySet &~ disablingPool)
.take(slotsForWarmPool)
.foreach(container => {
disablingPool += container
container ! GracefulShutdown
})
if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size > slotsForBusyPool + slotsForWarmPool) {
context.system.scheduler.scheduleOnce(5.seconds) {
waitForPoolToClear()
pool ! GracefulShutdown
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ContainerPoolTests
}

def poolConfig(userMemory: ByteSize) =
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)

behavior of "ContainerPool"

Expand Down Expand Up @@ -818,7 +818,8 @@ class ContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val initialCount = 2
val pool =
system.actorOf(
Expand Down Expand Up @@ -864,7 +865,8 @@ class ContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val minCount = 0
val initialCount = 2
val maxCount = 4
Expand Down Expand Up @@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {
}

it should "remove expired in order of expiration" in {
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
//use a second kind so that we know sorting is not isolated to the expired of each kind
val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None)
Expand All @@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {

it should "remove only the prewarmExpirationLimit of expired prewarms" in {
//limit prewarm removal to 2
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
val prewarmConfig =
Expand All @@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory {

it should "remove only the expired prewarms regardless of minCount" in {
//limit prewarm removal to 100
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second, 10)
val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None)
val memoryLimit = 256.MB
//minCount is 2 - should leave at least 2 prewarms when removing expired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ class ContainerProxyTests
(transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) =>
Future.successful(())
}
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second)
val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10)
def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2)
val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests
memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS),
prewarmMaxRetryLimit: Int = 3,
prewarmPromotion: Boolean = false,
batchDeletionSize: Int = 10,
prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) =
ContainerPoolConfig(
userMemory,
Expand All @@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests
prewarmMaxRetryLimit,
prewarmPromotion,
memorySyncInterval,
batchDeletionSize,
prewarmContainerCreationConfig)

def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId,
Expand Down Expand Up @@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests
}
}

it should "stop containers gradually when shut down" in within(timeout * 20) {
val (containers, factory) = testContainers(10)
val doc = put(entityStore, bigWhiskAction)
val topic = s"creationAck${schedulerInstanceId.asString}"
val consumer = new TestConnector(topic, 4, true)
val pool = system.actorOf(
Props(new FunctionPullingContainerPool(
factory,
invokerHealthService.ref,
poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3),
invokerInstance,
List.empty,
sendAckToScheduler(consumer.getProducer()))))

(0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken)
(0 to 10).foreach(i => {
containers(i).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
// create 5 container in busy pool, and 6 in warmed pool
if (i < 5)
containers(i).send(pool, Initialized(initializedData)) // container is initialized
else
containers(i).send(
pool,
ContainerIsPaused(
WarmData(
stub[DockerContainer],
invocationNamespace.asString,
whiskAction.toExecutableWhiskAction.get,
doc.rev,
Instant.now,
TestProbe().ref)))
})

// disable
pool ! GracefulShutdown
// at first, 3 containers will be removed from busy pool, and left containers will not
var disablingContainers = Set.empty[Int]
(0 to 10).foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 3, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
var completedContainer = -1
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
// only make one container complete shutting down
if (completedContainer == -1)
completedContainer = i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 6, "more than 3 containers is shutting down")
containers(completedContainer).send(pool, ContainerRemoved(false))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
// there should be only one more container going to shut down
assert(disablingContainers.size == 7, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 10, "more than 3 containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))

Thread.sleep(3000)
(0 to 10)
.filter(!disablingContainers.contains(_))
.foreach(i => {
try {
containers(i).expectMsg(1.second, GracefulShutdown)
disablingContainers += i
} catch {
case _: Throwable =>
}
})
assert(disablingContainers.size == 11, "unexpected containers is shutting down")
disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false)))
}

it should "create prewarmed containers on startup" in within(timeout) {
stream.reset()
val (containers, factory) = testContainers(1)
Expand Down Expand Up @@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests
3,
false,
FiniteDuration(10, TimeUnit.SECONDS),
10,
prewarmContainerCreationConfig)

val pool = system.actorOf(
Expand Down Expand Up @@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val initialCount = 2
val pool = system.actorOf(
Props(
Expand Down Expand Up @@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests
100,
3,
false,
1.second)
1.second,
10)
val minCount = 0
val initialCount = 2
val maxCount = 4
Expand Down Expand Up @@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests
100,
maxRetryLimit,
false,
1.second)
1.second,
10)
val initialCount = 1
val pool = system.actorOf(
Props(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests
100,
3,
false,
1.second)
1.second,
10)

val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 5.seconds)

Expand Down