diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 674fab91f9c..112c721a052 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -287,6 +287,7 @@ "CONFIG_whisk_containerPool_prewarmExpirationCheckIntervalVariance": "{{ container_pool_prewarm_expirationCheckIntervalVariance | default('10 seconds') }}" "CONFIG_whisk_containerPool_prewarmPromotion": "{{ container_pool_strict | default('false') | lower }}" "CONFIG_whisk_containerPool_prewarmMaxRetryLimit": "{{ container_pool_prewarm_max_retry_limit | default(5) }}" + "CONFIG_whisk_containerPool_batchDeletionSize": "{{ container_pool_batchDeletionSize | default(10) }}" "CONFIG_whisk_invoker_username": "{{ invoker.username }}" "CONFIG_whisk_invoker_password": "{{ invoker.password }}" diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala index 3b5a6c4ca42..e2cba13316d 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerFactory.scala @@ -54,12 +54,14 @@ case class ContainerPoolConfig(userMemory: ByteSize, prewarmMaxRetryLimit: Int, prewarmPromotion: Boolean, memorySyncInterval: FiniteDuration, + batchDeletionSize: Int, prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) { require( concurrentPeekFactor > 0 && concurrentPeekFactor <= 1.0, s"concurrentPeekFactor must be > 0 and <= 1.0; was $concurrentPeekFactor") require(prewarmExpirationCheckInterval.toSeconds > 0, "prewarmExpirationCheckInterval must be > 0") + require(batchDeletionSize > 0, "batch deletion size must be > 0") /** * The shareFactor indicates the number of containers that would share a single core, on average. diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 946b4717e30..61a2ec63152 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -67,6 +67,7 @@ whisk { prewarm-max-retry-limit: 5 # max subsequent retry limit to create prewarm containers prewarm-promotion: false # if true, action can take prewarm container which has bigger memory memory-sync-interval: 1 second # period to sync memory info to etcd + batch-deletion-size: 10 # batch size for removing containers when disable invoker, too big value may cause docker/k8s overload } kubernetes { diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala index 94f7bd2f1c3..46e84b51477 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerPool.scala @@ -94,6 +94,9 @@ class FunctionPullingContainerPool( private var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmData] private var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)] + // for shutting down + private var disablingPool = immutable.Set.empty[ActorRef] + private var shuttingDown = false private val creationMessages = TrieMap[ActorRef, ContainerCreationMessage]() @@ -353,18 +356,12 @@ class FunctionPullingContainerPool( // Container got removed case ContainerRemoved(replacePrewarm) => - inProgressPool.get(sender()).foreach { _ => - inProgressPool = inProgressPool - sender() - } - - warmedPool.get(sender()).foreach { _ => - warmedPool = warmedPool - sender() - } + inProgressPool = inProgressPool - sender() + warmedPool = warmedPool - sender() + disablingPool -= sender() // container was busy (busy indicates at full capacity), so there is capacity to accept another job request - busyPool.get(sender()).foreach { _ => - busyPool = busyPool - sender() - } + busyPool = busyPool - sender() //in case this was a prewarm prewarmedPool.get(sender()).foreach { data => @@ -601,11 +598,26 @@ class FunctionPullingContainerPool( * Make all busyPool's memoryQueue actor shutdown gracefully */ private def waitForPoolToClear(): Unit = { - busyPool.keys.foreach(_ ! GracefulShutdown) - warmedPool.keys.foreach(_ ! GracefulShutdown) - if (inProgressPool.nonEmpty) { + val pool = self + // how many busy containers will be removed in this term + val slotsForBusyPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0) + (busyPool.keySet &~ disablingPool) + .take(slotsForBusyPool) + .foreach(container => { + disablingPool += container + container ! GracefulShutdown + }) + // how many warm containers will be removed in this term + val slotsForWarmPool = math.max(poolConfig.batchDeletionSize - disablingPool.size, 0) + (warmedPool.keySet &~ disablingPool) + .take(slotsForWarmPool) + .foreach(container => { + disablingPool += container + container ! GracefulShutdown + }) + if (inProgressPool.nonEmpty || busyPool.size + warmedPool.size > slotsForBusyPool + slotsForWarmPool) { context.system.scheduler.scheduleOnce(5.seconds) { - waitForPoolToClear() + pool ! GracefulShutdown } } } diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala index eff4b926ded..eccf04cd784 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerPoolTests.scala @@ -134,7 +134,7 @@ class ContainerPoolTests } def poolConfig(userMemory: ByteSize) = - ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second) + ContainerPoolConfig(userMemory, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10) behavior of "ContainerPool" @@ -818,7 +818,8 @@ class ContainerPoolTests 100, 3, false, - 1.second) + 1.second, + 10) val initialCount = 2 val pool = system.actorOf( @@ -864,7 +865,8 @@ class ContainerPoolTests 100, 3, false, - 1.second) + 1.second, + 10) val minCount = 0 val initialCount = 2 val maxCount = 4 @@ -1237,7 +1239,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { } it should "remove expired in order of expiration" in { - val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second) + val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 1, 3, false, 1.second, 10) val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) //use a second kind so that we know sorting is not isolated to the expired of each kind val exec2 = CodeExecAsString(RuntimeManifest("actionKind2", ImageName("testImage")), "testCode", None) @@ -1261,7 +1263,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { it should "remove only the prewarmExpirationLimit of expired prewarms" in { //limit prewarm removal to 2 - val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second) + val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 2, 3, false, 1.second, 10) val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) val memoryLimit = 256.MB val prewarmConfig = @@ -1287,7 +1289,7 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { it should "remove only the expired prewarms regardless of minCount" in { //limit prewarm removal to 100 - val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second) + val poolConfig = ContainerPoolConfig(0.MB, 0.5, false, 2.second, 10.seconds, None, 100, 3, false, 1.second, 10) val exec = CodeExecAsString(RuntimeManifest("actionKind", ImageName("testImage")), "testCode", None) val memoryLimit = 256.MB //minCount is 2 - should leave at least 2 prewarms when removing expired diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index 87aeaf19e7e..193f9f66ae9 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -276,7 +276,7 @@ class ContainerProxyTests (transid: TransactionId, activation: WhiskActivation, isBlockingActivation: Boolean, context: UserContext) => Future.successful(()) } - val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second) + val poolConfig = ContainerPoolConfig(2.MB, 0.5, false, 2.second, 1.minute, None, 100, 3, false, 1.second, 10) def healthchecksConfig(enabled: Boolean = false) = ContainerProxyHealthCheckConfig(enabled, 100.milliseconds, 2) val filterEnvVar = (k: String) => Character.isUpperCase(k.charAt(0)) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala index 99fc09f928f..ca6c26061aa 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerPoolTests.scala @@ -180,6 +180,7 @@ class FunctionPullingContainerPoolTests memorySyncInterval: FiniteDuration = FiniteDuration(1, TimeUnit.SECONDS), prewarmMaxRetryLimit: Int = 3, prewarmPromotion: Boolean = false, + batchDeletionSize: Int = 10, prewarmContainerCreationConfig: Option[PrewarmContainerCreationConfig] = None) = ContainerPoolConfig( userMemory, @@ -192,6 +193,7 @@ class FunctionPullingContainerPoolTests prewarmMaxRetryLimit, prewarmPromotion, memorySyncInterval, + batchDeletionSize, prewarmContainerCreationConfig) def sendAckToScheduler(producer: MessageProducer)(schedulerInstanceId: SchedulerInstanceId, @@ -309,6 +311,118 @@ class FunctionPullingContainerPoolTests } } + it should "stop containers gradually when shut down" in within(timeout * 20) { + val (containers, factory) = testContainers(10) + val doc = put(entityStore, bigWhiskAction) + val topic = s"creationAck${schedulerInstanceId.asString}" + val consumer = new TestConnector(topic, 4, true) + val pool = system.actorOf( + Props(new FunctionPullingContainerPool( + factory, + invokerHealthService.ref, + poolConfig(MemoryLimit.STD_MEMORY * 20, batchDeletionSize = 3), + invokerInstance, + List.empty, + sendAckToScheduler(consumer.getProducer())))) + + (0 to 10).foreach(_ => pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)) // 11 * stdMemory taken) + (0 to 10).foreach(i => { + containers(i).expectMsgPF() { + case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true + } + // create 5 container in busy pool, and 6 in warmed pool + if (i < 5) + containers(i).send(pool, Initialized(initializedData)) // container is initialized + else + containers(i).send( + pool, + ContainerIsPaused( + WarmData( + stub[DockerContainer], + invocationNamespace.asString, + whiskAction.toExecutableWhiskAction.get, + doc.rev, + Instant.now, + TestProbe().ref))) + }) + + // disable + pool ! GracefulShutdown + // at first, 3 containers will be removed from busy pool, and left containers will not + var disablingContainers = Set.empty[Int] + (0 to 10).foreach(i => { + try { + containers(i).expectMsg(1.second, GracefulShutdown) + disablingContainers += i + } catch { + case _: Throwable => + } + }) + assert(disablingContainers.size == 3, "more than 3 containers is shutting down") + disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false))) + + Thread.sleep(3000) + var completedContainer = -1 + (0 to 10) + .filter(!disablingContainers.contains(_)) + .foreach(i => { + try { + containers(i).expectMsg(1.second, GracefulShutdown) + disablingContainers += i + // only make one container complete shutting down + if (completedContainer == -1) + completedContainer = i + } catch { + case _: Throwable => + } + }) + assert(disablingContainers.size == 6, "more than 3 containers is shutting down") + containers(completedContainer).send(pool, ContainerRemoved(false)) + + Thread.sleep(3000) + (0 to 10) + .filter(!disablingContainers.contains(_)) + .foreach(i => { + try { + containers(i).expectMsg(1.second, GracefulShutdown) + disablingContainers += i + } catch { + case _: Throwable => + } + }) + // there should be only one more container going to shut down + assert(disablingContainers.size == 7, "more than 3 containers is shutting down") + disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false))) + + Thread.sleep(3000) + (0 to 10) + .filter(!disablingContainers.contains(_)) + .foreach(i => { + try { + containers(i).expectMsg(1.second, GracefulShutdown) + disablingContainers += i + } catch { + case _: Throwable => + } + }) + assert(disablingContainers.size == 10, "more than 3 containers is shutting down") + disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false))) + + Thread.sleep(3000) + (0 to 10) + .filter(!disablingContainers.contains(_)) + .foreach(i => { + try { + containers(i).expectMsg(1.second, GracefulShutdown) + disablingContainers += i + } catch { + case _: Throwable => + } + }) + assert(disablingContainers.size == 11, "unexpected containers is shutting down") + disablingContainers.foreach(i => containers(i).send(pool, ContainerRemoved(false))) + } + it should "create prewarmed containers on startup" in within(timeout) { stream.reset() val (containers, factory) = testContainers(1) @@ -343,6 +457,7 @@ class FunctionPullingContainerPoolTests 3, false, FiniteDuration(10, TimeUnit.SECONDS), + 10, prewarmContainerCreationConfig) val pool = system.actorOf( @@ -906,7 +1021,8 @@ class FunctionPullingContainerPoolTests 100, 3, false, - 1.second) + 1.second, + 10) val initialCount = 2 val pool = system.actorOf( Props( @@ -958,7 +1074,8 @@ class FunctionPullingContainerPoolTests 100, 3, false, - 1.second) + 1.second, + 10) val minCount = 0 val initialCount = 2 val maxCount = 4 @@ -1105,7 +1222,8 @@ class FunctionPullingContainerPoolTests 100, maxRetryLimit, false, - 1.second) + 1.second, + 10) val initialCount = 1 val pool = system.actorOf( Props( diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala index c77932007a5..570440e1fb1 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala @@ -115,7 +115,8 @@ class FunctionPullingContainerProxyTests 100, 3, false, - 1.second) + 1.second, + 10) val timeoutConfig = ContainerProxyTimeoutConfig(5.seconds, 5.seconds, 5.seconds)