Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to address #3918, reuse a container on applicationError #3941

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ trait Container {
Future.failed(
InitializationError(
result.interval,
ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true))))
ActivationResponse.developerError(Messages.timedoutActivation(timeout, true))))
} else {
Future.failed(
InitializationError(
Expand Down Expand Up @@ -142,7 +142,7 @@ trait Container {
}
.map { result =>
val response = if (result.interval.duration >= timeout) {
ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false))
ActivationResponse.developerError(Messages.timedoutActivation(timeout, false))
} else {
ActivationResponse.processRunResponseContent(result.response, logging)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected[core] case class ActivationResponse private (val statusCode: Int, val

def isSuccess = statusCode == ActivationResponse.Success
def isApplicationError = statusCode == ActivationResponse.ApplicationError
def isContainerError = statusCode == ActivationResponse.ContainerError
def isContainerError = statusCode == ActivationResponse.DeveloperError
def isWhiskError = statusCode == ActivationResponse.WhiskError
def withoutResult = ActivationResponse(statusCode, None)

Expand All @@ -57,7 +57,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {

val Success = 0 // action ran successfully and produced a result
val ApplicationError = 1 // action ran but there was an error and it was handled
val ContainerError = 2 // action ran but failed to handle an error, or action did not run and failed to initialize
val DeveloperError = 2 // action ran but failed to handle an error, or action did not run and failed to initialize
val WhiskError = 3 // internal system error

protected[core] def messageForCode(code: Int) = {
Expand All @@ -71,16 +71,16 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
}

private def error(code: Int, errorValue: JsValue) = {
require(code == ApplicationError || code == ContainerError || code == WhiskError)
require(code == ApplicationError || code == DeveloperError || code == WhiskError)
ActivationResponse(code, Some(JsObject(ERROR_FIELD -> errorValue)))
}

protected[core] def success(result: Option[JsValue] = None) = ActivationResponse(Success, result)

protected[core] def applicationError(errorValue: JsValue) = error(ApplicationError, errorValue)
protected[core] def applicationError(errorMsg: String) = error(ApplicationError, JsString(errorMsg))
protected[core] def containerError(errorValue: JsValue) = error(ContainerError, errorValue)
protected[core] def containerError(errorMsg: String) = error(ContainerError, JsString(errorMsg))
protected[core] def developerError(errorValue: JsValue) = error(DeveloperError, errorValue)
protected[core] def developerError(errorMsg: String) = error(DeveloperError, JsString(errorMsg))
protected[core] def whiskError(errorValue: JsValue) = error(WhiskError, errorValue)
protected[core] def whiskError(errorMsg: String) = error(WhiskError, JsString(errorMsg))

Expand Down Expand Up @@ -148,21 +148,21 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
// If the response is a JSON object container an error field, accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
val errorContent = errorOpt getOrElse invalidInitResponse(str).toJson
containerError(errorContent)
developerError(errorContent)
case _ =>
containerError(invalidInitResponse(str))
developerError(invalidInitResponse(str))
}

case Some((length, maxlength)) =>
containerError(truncatedResponse(str, length, maxlength))
developerError(truncatedResponse(str, length, maxlength))
}

case Left(_: MemoryExhausted) =>
containerError(memoryExhausted)
developerError(memoryExhausted)

case Left(e) =>
// This indicates a terminal failure in the container (it exited prematurely).
containerError(abnormalInitialization)
developerError(abnormalInitialization)
}
}

Expand Down Expand Up @@ -194,29 +194,29 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
// Any non-200 code is treated as a container failure. We still need to check whether
// there was a useful error message in there.
val errorContent = errorOpt getOrElse invalidRunResponse(str).toJson
containerError(errorContent)
developerError(errorContent)
}

case scala.util.Success(notAnObj) =>
// This should affect only blackbox containers, since our own containers should already test for that.
containerError(invalidRunResponse(str))
developerError(invalidRunResponse(str))

case scala.util.Failure(t) =>
// This should affect only blackbox containers, since our own containers should already test for that.
logger.warn(this, s"response did not json parse: '$str' led to $t")
containerError(invalidRunResponse(str))
developerError(invalidRunResponse(str))
}

case Some((length, maxlength)) =>
containerError(truncatedResponse(str, length, maxlength))
developerError(truncatedResponse(str, length, maxlength))
}

case Left(_: MemoryExhausted) =>
containerError(memoryExhausted)
developerError(memoryExhausted)

case Left(e) =>
// This indicates a terminal failure in the container (it exited prematurely).
containerError(abnormalRun)
developerError(abnormalRun)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class ContainerProxy(
// the failure is either the system fault, or for docker actions, the application/developer fault
val response = t match {
case WhiskContainerStartupError(msg) => ActivationResponse.whiskError(msg)
case BlackboxStartupError(msg) => ActivationResponse.applicationError(msg)
case BlackboxStartupError(msg) => ActivationResponse.developerError(msg)
case _ => ActivationResponse.whiskError(Messages.resourceProvisionError)
}
val context = UserContext(job.msg.user)
Expand Down Expand Up @@ -423,9 +423,10 @@ class ContainerProxy(

// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
case Right(act) if !act.response.isSuccess => Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
case Right(act) => Future.successful(act)
case Right(act) if !act.response.isSuccess && !act.response.isApplicationError =>
Future.failed(ActivationUnsuccessfulError(act))
case Left(error) => Future.failed(error)
case Right(act) => Future.successful(act)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
withActivation(wsk.activation, wsk.action.invoke(name)) { activation =>
val response = activation.response
response.result.get.fields("error") shouldBe Messages.abnormalInitialization.toJson
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError)
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
}
}

Expand All @@ -199,7 +199,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
withActivation(wsk.activation, wsk.action.invoke(name)) { activation =>
val response = activation.response
response.result.get.fields("error") shouldBe Messages.timedoutActivation(3 seconds, true).toJson
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
}
}

Expand All @@ -213,7 +213,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
withActivation(wsk.activation, wsk.action.invoke(name)) { activation =>
val response = activation.response
response.result.get.fields("error") shouldBe Messages.abnormalRun.toJson
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ContainerError)
response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
}
}

Expand Down Expand Up @@ -285,7 +285,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct

val run = wsk.action.invoke(name)
withActivation(wsk.activation, run) { activation =>
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
activation.response.result.get
.fields("error") shouldBe s"Failed to pull container image '$containerName'.".toJson
activation.annotations shouldBe defined
Expand Down Expand Up @@ -375,7 +375,7 @@ class WskRestBasicUsageTests extends TestHelpers with WskTestHelpers with WskAct
val hungRun = wsk.action.invoke(name, Map("forceHang" -> true.toJson))
withActivation(wsk.activation, hungRun) { activation =>
// the first action must fail with a timeout error
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.ApplicationError)
activation.response.status shouldBe ActivationResponse.messageForCode(ActivationResponse.DeveloperError)
activation.response.result shouldBe Some(
JsObject("error" -> Messages.timedoutActivation(3 seconds, false).toJson))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ class DockerContainerTests

val error = the[InitializationError] thrownBy await(init, initTimeout)
error.interval shouldBe interval
error.response.statusCode shouldBe ActivationResponse.ApplicationError
error.response.statusCode shouldBe ActivationResponse.DeveloperError

// assert the finish log is there
val end = LogMarker.parse(logLines.last)
Expand Down Expand Up @@ -474,7 +474,7 @@ class DockerContainerTests
}

val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout)
await(runResult) shouldBe (interval, ActivationResponse.applicationError(
await(runResult) shouldBe (interval, ActivationResponse.developerError(
Messages.timedoutActivation(runTimeout, false)))

// assert the finish log is there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class KubernetesContainerTests

val error = the[InitializationError] thrownBy await(init, initTimeout)
error.interval shouldBe interval
error.response.statusCode shouldBe ActivationResponse.ApplicationError
error.response.statusCode shouldBe ActivationResponse.DeveloperError

// assert the finish log is there
val end = LogMarker.parse(logLines.last)
Expand Down Expand Up @@ -288,7 +288,7 @@ class KubernetesContainerTests
}

val runResult = container.run(JsObject.empty, JsObject.empty, runTimeout)
await(runResult) shouldBe (interval, ActivationResponse.applicationError(
await(runResult) shouldBe (interval, ActivationResponse.developerError(
Messages.timedoutActivation(runTimeout, false)))

// assert the finish log is there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ class ContainerProxyTests
Interval(now, now.plusMillis(200))
}

val errorInterval = {
val now = initInterval.end.plusMillis(75) // delay between init and run
Interval(now, now.plusMillis(150))
}

val uuid = UUID()

val message = ActivationMessage(
Expand Down Expand Up @@ -386,6 +391,73 @@ class ContainerProxyTests
}
}

it should "complete the transaction and reuse the container on a failed run IFF failure was applicationError" in within(
timeout) {
val container = new TestContainer {
override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
runCount += 1
//every other run fails
if (runCount % 2 == 0) {
Future.successful((runInterval, ActivationResponse.success()))
} else {
Future.successful((errorInterval, ActivationResponse.applicationError(("boom"))))
}
}
}
val factory = createFactory(Future.successful(container))
val acker = createAcker()
val store = createStore
val collector = createCollector()

val machine =
childActorOf(
ContainerProxy
.props(factory, acker, store, collector, InvokerInstanceId(0), poolConfig, pauseGrace = timeout))
registerCallback(machine)
preWarm(machine)

//first one will fail
run(machine, Started)

// Note that there are no intermediate state changes
//second one will succeed
run(machine, Ready)

//With exception of the error on first run, the assertions should be the same as in
// `run an action and continue with a next run without pausing the container`
awaitAssert {
factory.calls should have size 1
container.initializeCount shouldBe 1
container.runCount shouldBe 2
collector.calls should have size 2
container.suspendCount shouldBe 0
container.destroyCount shouldBe 0
acker.calls should have size 2
store.calls should have size 2

val initErrorActivation = acker.calls(0)._2
initErrorActivation.duration shouldBe Some((initInterval.duration + errorInterval.duration).toMillis)
initErrorActivation.annotations
.get(WhiskActivation.initTimeAnnotation)
.get
.convertTo[Int] shouldBe initInterval.duration.toMillis
initErrorActivation.annotations
.get(WhiskActivation.waitTimeAnnotation)
.get
.convertTo[Int] shouldBe
Interval(message.transid.meta.start, initInterval.start).duration.toMillis

val runOnlyActivation = acker.calls(1)._2
runOnlyActivation.duration shouldBe Some(runInterval.duration.toMillis)
runOnlyActivation.annotations.get(WhiskActivation.initTimeAnnotation) shouldBe empty
runOnlyActivation.annotations.get(WhiskActivation.waitTimeAnnotation).get.convertTo[Int] shouldBe {
Interval(message.transid.meta.start, runInterval.start).duration.toMillis
}
}

}

/*
* ERROR CASES
*/
Expand Down Expand Up @@ -424,7 +496,7 @@ class ContainerProxyTests
override def initialize(initializer: JsObject,
timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = {
initializeCount += 1
Future.failed(InitializationError(initInterval, ActivationResponse.applicationError("boom")))
Future.failed(InitializationError(initInterval, ActivationResponse.developerError("boom")))
}
}
val factory = createFactory(Future.successful(container))
Expand All @@ -449,7 +521,7 @@ class ContainerProxyTests
collector.calls should have size 1
container.destroyCount shouldBe 1
val activation = acker.calls(0)._2
activation.response shouldBe ActivationResponse.applicationError("boom")
activation.response shouldBe ActivationResponse.developerError("boom")
activation.annotations
.get(WhiskActivation.initTimeAnnotation)
.get
Expand All @@ -459,12 +531,13 @@ class ContainerProxyTests
}
}

it should "complete the transaction and destroy the container on a failed run" in within(timeout) {
it should "complete the transaction and destroy the container on a failed run IFF failure was containerError" in within(
timeout) {
val container = new TestContainer {
override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)(
implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = {
runCount += 1
Future.successful((initInterval, ActivationResponse.applicationError("boom")))
Future.successful((initInterval, ActivationResponse.developerError(("boom"))))
}
}
val factory = createFactory(Future.successful(container))
Expand All @@ -488,7 +561,7 @@ class ContainerProxyTests
container.runCount shouldBe 1
collector.calls should have size 1
container.destroyCount shouldBe 1
acker.calls(0)._2.response shouldBe ActivationResponse.applicationError("boom")
acker.calls(0)._2.response shouldBe ActivationResponse.developerError("boom")
store.calls should have size 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ trait WebActionsApiBaseTests extends ControllerTestCommon with BeforeAndAfterEac
r.fields.get("application_error").map { e =>
ActivationResponse.applicationError(e)
} orElse r.fields.get("developer_error").map { e =>
ActivationResponse.containerError(e)
ActivationResponse.developerError(e)
} orElse r.fields.get("whisk_error").map { e =>
ActivationResponse.whiskError(e)
} orElse None // for clarity
Expand Down
Loading