Skip to content

Commit

Permalink
Fix review points
Browse files Browse the repository at this point in the history
  • Loading branch information
ningyougang committed Apr 28, 2021
1 parent 9ae9c52 commit 8374e39
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import scala.concurrent.duration._
import scala.util.{Random, Try}
import scala.collection.immutable.Queue

case class Creation(creationMessage: ContainerCreationMessage, action: WhiskAction)
case class Deletion(deletionMessage: ContainerDeletionMessage)
case class CreationContainer(creationMessage: ContainerCreationMessage, action: WhiskAction)
case class DeletionContainer(deletionMessage: ContainerDeletionMessage)
case object Remove
case class Keep(timeout: FiniteDuration)
case class PrewarmContainer(maxConcurrent: Int)
Expand Down Expand Up @@ -166,7 +166,7 @@ class FunctionPullingContainerPool(
}
}

case Creation(create: ContainerCreationMessage, action: WhiskAction) =>
case CreationContainer(create: ContainerCreationMessage, action: WhiskAction) =>
if (shuttingDown) {
val message =
s"creationId: ${create.creationId}, invoker is shutting down, reschedule ${action.fullyQualifiedName(false)}"
Expand Down Expand Up @@ -217,7 +217,7 @@ class FunctionPullingContainerPool(
}
}

case Deletion(deletionMessage: ContainerDeletionMessage) =>
case DeletionContainer(deletionMessage: ContainerDeletionMessage) =>
val oldRevision = deletionMessage.revision
val invocationNamespace = deletionMessage.invocationNamespace
val fqn = deletionMessage.action.copy(version = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ class FunctionPullingContainerPoolTests
List.empty,
sendAckToScheduler(producer))))

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -253,19 +253,19 @@ class FunctionPullingContainerPoolTests
sendAckToScheduler(producer))))

// Start first action
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}

// Send second action to the pool
pool ! Creation(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction) // message is too large to be processed immediately.
containers(1).expectNoMessage(100.milliseconds)

// First container is removed
containers(0).send(pool, ContainerRemoved(true)) // pool is empty again.

pool ! Creation(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
pool ! CreationContainer(creationMessageLarge.copy(revision = bigDoc.rev), bigWhiskAction)
// Second container should run now
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, bigExecuteAction, schedulerHost, rpcPort, _) => true
Expand All @@ -288,7 +288,7 @@ class FunctionPullingContainerPoolTests
sendAckToScheduler(consumer.getProducer()))))

pool ! GracefulShutdown
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken

containers(0).expectNoMessage()

Expand All @@ -306,7 +306,7 @@ class FunctionPullingContainerPoolTests

// pool should be back to work after enabled again
pool ! Enable
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction) // 1 * stdMemory taken
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -412,7 +412,7 @@ class FunctionPullingContainerPoolTests
containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggerMemory)))

// the prewarm container with matched memory should be chose
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -423,7 +423,7 @@ class FunctionPullingContainerPoolTests
}

// the prewarm container with bigger memory should not be chose
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(3).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -452,7 +452,7 @@ class FunctionPullingContainerPoolTests
containers(1).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = biggestMemory)))

// the prewarm container with smallest memory should be chose
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -463,7 +463,7 @@ class FunctionPullingContainerPoolTests
}

// the prewarm container with bigger memory should be chose
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -475,7 +475,7 @@ class FunctionPullingContainerPoolTests

// now free memory is (6 - 3 - 1) * stdMemory, and required 2 * stdMemory, so both two prewarmed containers are not suitable
// a new container should be created
pool ! Creation(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
pool ! CreationContainer(creationMessageLarge.copy(revision = doc.rev), bigWhiskAction)
containers(4).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -502,7 +502,7 @@ class FunctionPullingContainerPoolTests
containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed
containers(0).send(pool, ReadyToWork(prewarmedData.copy(kind = alternativeExec.kind))) // container0 was started

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand All @@ -526,7 +526,7 @@ class FunctionPullingContainerPoolTests
containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed
containers(0).send(pool, ReadyToWork(prewarmedData.copy(memoryLimit = alternativeLimit))) // container0 was started

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(1).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -560,24 +560,24 @@ class FunctionPullingContainerPoolTests
container.ref)

// the revision doesn't match, create 1 container
pool ! Creation(creationMessage, whiskAction)
pool ! CreationContainer(creationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}

// the invocation namespace doesn't match, create 1 container
pool ! Creation(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
pool ! CreationContainer(creationMessage.copy(invocationNamespace = "otherNamespace"), whiskAction)
containers(1).expectMsgPF() {
case Initialize("otherNamespace", executeAction, schedulerHost, rpcPort, _) => true
}

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}

// warmed container is occupied, create 1 more container
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(2).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -612,7 +612,7 @@ class FunctionPullingContainerPoolTests
container.ref)

// choose the warmed container
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -687,7 +687,7 @@ class FunctionPullingContainerPoolTests
container3.ref)

// now the pool has no free memory, and new job needs 2*stdMemory, so it needs to remove two warmed containers
pool ! Creation(creationMessage, bigWhiskAction)
pool ! CreationContainer(creationMessage, bigWhiskAction)
container1.expectMsg(Remove)
container2.expectMsg(Remove)
container3.expectNoMessage()
Expand Down Expand Up @@ -736,7 +736,7 @@ class FunctionPullingContainerPoolTests
val actualCreationMessage = creationMessage.copy(revision = doc.rev)
val ackMessage = createAckMsg(actualCreationMessage, None, None)

pool ! Creation(actualCreationMessage, whiskAction)
pool ! CreationContainer(actualCreationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -784,7 +784,7 @@ class FunctionPullingContainerPoolTests
container.ref)

// choose the warmed container
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
container.expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -830,7 +830,7 @@ class FunctionPullingContainerPoolTests
val ackMessage =
createAckMsg(actualCreationMessageLarge, Some(ResourceNotEnoughError), Some(error))

pool ! Creation(actualCreationMessageLarge, bigWhiskAction)
pool ! CreationContainer(actualCreationMessageLarge, bigWhiskAction)

utilRetry({
val buffer = consumer.peek(50.millisecond)
Expand All @@ -842,7 +842,7 @@ class FunctionPullingContainerPoolTests
val actualCreationMessage = creationMessage.copy(revision = doc2.rev)
val rescheduleAckMsg = createAckMsg(actualCreationMessage, Some(UnknownError), Some("ContainerProxy init failed."))

pool ! Creation(actualCreationMessage, whiskAction)
pool ! CreationContainer(actualCreationMessage, whiskAction)
containers(0).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -883,9 +883,9 @@ class FunctionPullingContainerPoolTests
sendAckToScheduler(producer))))
containers(0).expectMsg(Start(exec, memoryLimit))

pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)

awaitAssert {
count shouldBe 3
Expand Down Expand Up @@ -1011,11 +1011,11 @@ class FunctionPullingContainerPoolTests
stream.toString should not include (s"removed ${initialCount} expired prewarmed container")

// 2 cold start happened
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(2).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(3).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down Expand Up @@ -1049,23 +1049,23 @@ class FunctionPullingContainerPoolTests
stream.reset()

// 5 code start happened(5 > maxCount)
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(6).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(7).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(8).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(9).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
pool ! Creation(creationMessage.copy(revision = doc.rev), whiskAction)
pool ! CreationContainer(creationMessage.copy(revision = doc.rev), whiskAction)
containers(10).expectMsgPF() {
case Initialize(invocationNamespace, executeAction, schedulerHost, rpcPort, _) => true
}
Expand Down

0 comments on commit 8374e39

Please sign in to comment.