Skip to content

Commit

Permalink
Support array result for common action and sequence action (#5290)
Browse files Browse the repository at this point in the history
* Support array result

* Make controller accept json array

* Make elasticsearch support json array

Couchdb already suports

* Make go runtime test cases due to depend on this

* Add test case for array result for nodejs runtime

* Make sequence action to support array result

* Optimize sequence action to support array result

* Fix test case for sequence action feature

* Add test case for sequence action

This test case is just for nodejs

* Add extra method runForJsArray for runtime tests

* Fix build error

* Fix review comment
  • Loading branch information
ningyougang authored Aug 1, 2022
1 parent d92485c commit 62b8a50
Show file tree
Hide file tree
Showing 28 changed files with 342 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class ActivationMessage(override val transid: TransactionId,
activationId: ActivationId,
rootControllerIndex: ControllerInstanceId,
blocking: Boolean,
content: Option[JsObject],
content: Option[JsValue],
initArgs: Set[String] = Set.empty,
lockedArgs: Map[String, String] = Map.empty,
cause: Option[ActivationId] = None,
Expand Down Expand Up @@ -380,9 +380,13 @@ object Activation extends DefaultJsonProtocol {

/** Get "StatusCode" from result response set by action developer * */
def userDefinedStatusCode(result: Option[JsValue]): Option[Int] = {
val statusCode = JsHelpers
.getFieldPath(result.get.asJsObject, ERROR_FIELD, "statusCode")
.orElse(JsHelpers.getFieldPath(result.get.asJsObject, "statusCode"))
val statusCode: Option[JsValue] = result match {
case Some(JsObject(fields)) =>
JsHelpers
.getFieldPath(JsObject(fields), ERROR_FIELD, "statusCode")
.orElse(JsHelpers.getFieldPath(JsObject(fields), "statusCode"))
case _ => None
}
statusCode.map {
case value => Try(value.convertTo[BigInt].intValue).toOption.getOrElse(BadRequest.intValue)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ object AkkaContainerClient {
result
}

/** A helper method to post one single request to a connection. Used for container tests. */
def postForJsArray(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): (Int, Option[JsArray]) = {
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
val response = executeRequestForJsArray(connection, endPoint, content)
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
connection.close()
result
}

/** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */
def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)(
implicit logging: Logging,
Expand Down Expand Up @@ -233,4 +246,24 @@ object AkkaContainerClient {

res
}

private def executeRequestForJsArray(connection: AkkaContainerClient, endpoint: String, content: JsValue)(
implicit logging: Logging,
as: ActorSystem,
ec: ExecutionContext,
tid: TransactionId): Future[(Int, Option[JsArray])] = {

val res = connection
.post(endpoint, content, true)
.map({
case Right(r) => (r.statusCode, Try(r.entity.parseJson.convertTo[JsArray]).toOption)
case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container")
case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t: java.net.SocketTimeoutException)) =>
throw new java.util.concurrent.TimeoutException()
case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage)
})

res
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.util.ByteString
import pureconfig._
import pureconfig.generic.auto._
import spray.json.DefaultJsonProtocol._
import spray.json.JsObject
import spray.json.{JsObject, JsValue}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.ConfigKeys
import org.apache.openwhisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse}
Expand Down Expand Up @@ -159,7 +159,7 @@ trait Container {
}

/** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */
def run(parameters: JsObject,
def run(parameters: JsValue,
environment: JsObject,
timeout: FiniteDuration,
maxConcurrent: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,8 @@ class ElasticSearchActivationStore(
restoreAnnotations(restoreResponse(hit.sourceAsString.parseJson.asJsObject)).convertTo[WhiskActivation]
}

private def restoreAnnotations(js: JsObject): JsObject = {
val annotations = js.fields
private def restoreAnnotations(js: JsValue): JsObject = {
val annotations = js.asJsObject.fields
.get("annotations")
.map { anno =>
Try {
Expand All @@ -399,10 +399,10 @@ class ElasticSearchActivationStore(
}.getOrElse(JsArray.empty)
}
.getOrElse(JsArray.empty)
JsObject(js.fields.updated("annotations", annotations))
JsObject(js.asJsObject.fields.updated("annotations", annotations))
}

private def restoreResponse(js: JsObject): JsObject = {
private def restoreResponse(js: JsObject): JsValue = {
val response = js.fields
.get("response")
.map { res =>
Expand All @@ -412,7 +412,10 @@ class ElasticSearchActivationStore(
.get("result")
.map { r =>
val JsString(data) = r
data.parseJson.asJsObject
data.parseJson match {
case JsArray(elements) => JsArray(elements)
case _ => data.parseJson.asJsObject
}
}
.getOrElse(JsObject.empty)
JsObject(temp.updated("result", result))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
* NOTE: the code is application error (since this response could be used as a response for the sequence
* if the payload contains an error)
*/
protected[core] def payloadPlaceholder(payload: Option[JsObject]) = ActivationResponse(ApplicationError, payload)
protected[core] def payloadPlaceholder(payload: Option[JsValue]) = ActivationResponse(ApplicationError, payload)

/**
* Class of errors for invoker-container communication.
Expand Down Expand Up @@ -203,7 +203,7 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
truncated match {
case None =>
val sizeOpt = Option(str).map(_.length)
Try { str.parseJson.asJsObject } match {
Try { str.parseJson } match {
case scala.util.Success(result @ JsObject(fields)) =>
// If the response is a JSON object container an error field, accept it as the response error.
val errorOpt = fields.get(ERROR_FIELD)
Expand All @@ -222,6 +222,17 @@ protected[core] object ActivationResponse extends DefaultJsonProtocol {
developerError(errorContent, sizeOpt)
}

case scala.util.Success(result @ JsArray(_)) =>
if (res.okStatus) {
success(Some(result), sizeOpt)
} else {
// Any non-200 code is treated as a container failure. We still need to check whether
// there was a useful error message in there.
val errorContent = invalidRunResponse(str).toJson
//developerErrorWithLog(errorContent, sizeOpt, None)
developerError(errorContent, sizeOpt)
}

case scala.util.Success(notAnObj) =>
// This should affect only blackbox containers, since our own containers should already test for that.
developerError(invalidRunResponse(str), sizeOpt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ object Messages {
}

def invalidRunResponse(actualResponse: String) = {
"The action did not produce a valid JSON response" + {
"The action did not produce a valid JSON or JSON Array response" + {
Option(actualResponse) filter { _.nonEmpty } map { s =>
s": $s"
} getOrElse "."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,12 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
complete(Accepted, activationId.toJsObject)
}
case Success(Right(activation)) =>
val response = if (result) activation.resultAsJson else activation.toExtendedJson()

val response = activation.response.result match {
case Some(JsArray(elements)) =>
JsArray(elements)
case _ =>
if (result) activation.resultAsJson else activation.toExtendedJson()
}
respondWithActivationIdHeader(activation.activationId) {
if (activation.response.isSuccess) {
complete(OK, response)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected[core] trait PostActionActivation extends PrimitiveActions with Sequenc
protected[controller] def invokeAction(
user: Identity,
action: WhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {
action.toExecutableWhiskAction match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected[actions] trait PrimitiveActions {
user: Identity,
action: WhiskActionMetaData,
components: Vector[FullyQualifiedEntityName],
payload: Option[JsObject],
payload: Option[JsValue],
waitForOutermostResponse: Option[FiniteDuration],
cause: Option[ActivationId],
topmost: Boolean,
Expand Down Expand Up @@ -109,7 +109,7 @@ protected[actions] trait PrimitiveActions {
protected[actions] def invokeSingleAction(
user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {

Expand Down Expand Up @@ -152,12 +152,16 @@ protected[actions] trait PrimitiveActions {
private def invokeSimpleAction(
user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]] = {

// merge package parameters with action (action parameters supersede), then merge in payload
val args = action.parameters merge payload
val args: Option[JsValue] = payload match {
case Some(JsObject(fields)) => action.parameters merge Some(JsObject(fields))
case Some(JsArray(elements)) => Some(JsArray(elements))
case _ => Some(action.parameters.toJsObject)
}
val activationId = activationIdFactory.make()

val startActivation = transid.started(
Expand All @@ -169,6 +173,10 @@ protected[actions] trait PrimitiveActions {
val startLoadbalancer =
transid.started(this, LoggingMarkers.CONTROLLER_LOADBALANCER, s"action activation id: ${activationId}")

val keySet = payload match {
case Some(JsObject(fields)) => Some(fields.keySet)
case _ => None
}
val message = ActivationMessage(
transid,
FullyQualifiedEntityName(action.namespace, action.name, Some(action.version), action.binding),
Expand All @@ -179,7 +187,7 @@ protected[actions] trait PrimitiveActions {
waitForResponse.isDefined,
args,
action.parameters.initParameters,
action.parameters.lockedParameters(payload.map(_.fields.keySet).getOrElse(Set.empty)),
action.parameters.lockedParameters(keySet.getOrElse(Set.empty)),
cause = cause,
WhiskTracerProvider.tracer.getTraceContext(transid))

Expand Down Expand Up @@ -271,7 +279,7 @@ protected[actions] trait PrimitiveActions {
*/
private def invokeComposition(user: Identity,
action: ExecutableWhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId],
accounting: Option[CompositionAccounting] = None)(
Expand Down Expand Up @@ -319,7 +327,7 @@ protected[actions] trait PrimitiveActions {
* @param parentTid a parent transaction id
*/
private def invokeConductor(user: Identity,
payload: Option[JsObject],
payload: Option[JsValue],
session: Session,
parentTid: TransactionId): Future[ActivationResponse] = {

Expand All @@ -330,9 +338,13 @@ protected[actions] trait PrimitiveActions {
Future.successful(ActivationResponse.applicationError(compositionIsTooLong))
} else {
// inject state into payload if any
val params = session.state
.map(state => Some(JsObject(payload.getOrElse(JsObject.empty).fields ++ state.fields)))
.getOrElse(payload)
val params: Option[JsValue] = payload match {
case Some(JsObject(fields)) =>
session.state
.map(state => Some(JsObject(JsObject(fields).fields ++ state.fields)))
.getOrElse(payload)
case _ => None
}

// invoke conductor action
session.accounting.conductors += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected[actions] trait SequenceActions {
protected[actions] def invokeAction(
user: Identity,
action: WhiskActionMetaData,
payload: Option[JsObject],
payload: Option[JsValue],
waitForResponse: Option[FiniteDuration],
cause: Option[ActivationId])(implicit transid: TransactionId): Future[Either[ActivationId, WhiskActivation]]

Expand All @@ -93,7 +93,7 @@ protected[actions] trait SequenceActions {
user: Identity,
action: WhiskActionMetaData,
components: Vector[FullyQualifiedEntityName],
payload: Option[JsObject],
payload: Option[JsValue],
waitForOutermostResponse: Option[FiniteDuration],
cause: Option[ActivationId],
topmost: Boolean,
Expand Down Expand Up @@ -266,7 +266,7 @@ protected[actions] trait SequenceActions {
user: Identity,
seqAction: WhiskActionMetaData,
seqActivationId: ActivationId,
inputPayload: Option[JsObject],
inputPayload: Option[JsValue],
components: Vector[FullyQualifiedEntityName],
cause: Option[ActivationId],
atomicActionCnt: Int)(implicit transid: TransactionId): Future[SequenceAccounting] = {
Expand Down Expand Up @@ -347,7 +347,12 @@ protected[actions] trait SequenceActions {
// the accounting no longer needs to hold a reference to it once the action is
// invoked, so previousResponse.getAndSet(null) drops the reference at this point
// which prevents dragging the previous response for the lifetime of the next activation
val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject)
val previousResult = accounting.previousResponse.getAndSet(null).result
val inputPayload: Option[JsValue] = previousResult match {
case Some(JsObject(fields)) => Some(JsObject(fields))
case Some(JsArray(elements)) => Some(JsArray(elements))
case _ => None
}

// invoke the action by calling the right method depending on whether it's an atomic action or a sequence
val futureWhiskActivationTuple = action.toExecutableWhiskAction match {
Expand Down Expand Up @@ -460,9 +465,10 @@ protected[actions] case class SequenceAccounting(atomicActionCnt: Int,
// check conditions on payload that may lead to interrupting the execution of the sequence
// short-circuit the execution of the sequence iff the payload contains an error field
// and is the result of an action return, not the initial payload
val outputPayload = activation.response.result.map(_.asJsObject)
val payloadContent = outputPayload getOrElse JsObject.empty
val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
val errorField: Option[JsValue] = activation.response.result match {
case Some(JsObject(fields)) => fields.get(ActivationResponse.ERROR_FIELD)
case _ => None
}
val withinSeqLimit = newCnt <= maxSequenceCnt

if (withinSeqLimit && errorField.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,25 +1082,28 @@ object ContainerProxy {
* @param initArgs set of parameters to treat as initialization arguments
* @return A partition of the arguments into an environment variables map and the JsObject argument to the action
*/
def partitionArguments(content: Option[JsObject], initArgs: Set[String]): (Map[String, JsValue], JsObject) = {
def partitionArguments(content: Option[JsValue], initArgs: Set[String]): (Map[String, JsValue], JsValue) = {
content match {
case None => (Map.empty, JsObject.empty)
case Some(js) if initArgs.isEmpty => (Map.empty, js)
case Some(js) =>
val (env, args) = js.fields.partition(k => initArgs.contains(k._1))
case None => (Map.empty, JsObject.empty)
case Some(JsArray(elements)) => (Map.empty, JsArray(elements))
case Some(JsObject(fields)) if initArgs.isEmpty => (Map.empty, JsObject(fields))
case Some(JsObject(fields)) =>
val (env, args) = fields.partition(k => initArgs.contains(k._1))
(env, JsObject(args))
}
}

def unlockArguments(content: Option[JsObject],
def unlockArguments(content: Option[JsValue],
lockedArgs: Map[String, String],
decoder: ParameterEncryption): Option[JsObject] = {
content.map {
case JsObject(fields) =>
JsObject(fields.map {
decoder: ParameterEncryption): Option[JsValue] = {
content match {
case Some(JsObject(fields)) =>
Some(JsObject(fields.map {
case (k, v: JsString) if lockedArgs.contains(k) => (k -> decoder.encryptor(lockedArgs(k)).decrypt(v))
case p => p
})
}))
// keep the original for other type(e.g. JsArray)
case contentValue => contentValue
}
}
}
Expand Down
Loading

0 comments on commit 62b8a50

Please sign in to comment.