Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BW-1206 - Combine all Wes Endpoints & add Tests #6833

Merged
merged 9 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import cromwell.services.instrumentation.CromwellInstrumentationActor
import cromwell.webservice.SwaggerService
import cromwell.webservice.routes.CromwellApiService
import cromwell.webservice.routes.wes.WesRouteSupport
import cromwell.webservice.routes.wes.WesRunRoutes

import scala.concurrent.Future
import scala.util.{Failure, Success}
Expand All @@ -37,7 +36,6 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
with CromwellApiService
with CromwellInstrumentationActor
with WesRouteSupport
with WesRunRoutes
with SwaggerService
with ActorLogging {
implicit val actorSystem = context.system
Expand All @@ -53,7 +51,7 @@ class CromwellServerActor(cromwellSystem: CromwellSystem, gracefulShutdown: Bool
* cromwell.yaml is broken unless the swagger index.html is patched. Copy/paste the code from rawls or cromiam if
* actual cromwell+swagger+oauth+/api support is needed.
*/
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes, runRoutes))
val apiRoutes: Route = pathPrefix("api")(concat(workflowRoutes, womtoolRoutes, wesRoutes))
val nonApiRoutes: Route = concat(engineRoutes, swaggerUiResourceRoute)
val allRoutes: Route = concat(apiRoutes, nonApiRoutes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ import scala.io.Source
import scala.util.{Failure, Success, Try}

trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport with WomtoolRouteSupport with WebServiceUtils with WesCromwellRouteSupport {

import CromwellApiService._

implicit def actorRefFactory: ActorRefFactory

implicit val materializer: ActorMaterializer
implicit val ec: ExecutionContext

Expand All @@ -57,9 +55,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
}
},
path("engine" / Segment / "version") { _ =>
get {
complete(versionResponse)
}
get { complete(versionResponse) }
},
path("engine" / Segment / "status") { _ =>
onComplete(serviceRegistryActor.ask(GetCurrentStatus).mapTo[StatusCheckResponse]) {
Expand All @@ -74,11 +70,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w

val workflowRoutes =
path("workflows" / Segment / "backends") { _ =>
get {
instrumentRequest {
complete(ToResponseMarshallable(backendResponse))
}
}
get { instrumentRequest { complete(ToResponseMarshallable(backendResponse)) } }
} ~
path("workflows" / Segment / "callcaching" / "diff") { _ =>
parameterSeq { parameters =>
Expand Down Expand Up @@ -144,7 +136,7 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
val response = validateWorkflowIdInMetadata(possibleWorkflowId, serviceRegistryActor) flatMap { workflowId =>
workflowStoreActor.ask(WorkflowStoreActor.WorkflowOnHoldToSubmittedCommand(workflowId)).mapTo[WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedResponse]
}
onComplete(response) {
onComplete(response){
case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e: NotInOnHoldStateException)) => e.errorRequest(StatusCodes.Forbidden)
case Success(WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedFailure(_, e)) => e.errorRequest(StatusCodes.InternalServerError)
case Success(r: WorkflowStoreEngineActor.WorkflowOnHoldToSubmittedSuccess) => completeResponse(StatusCodes.OK, toResponse(r.workflowId, WorkflowSubmitted), Seq.empty)
Expand Down Expand Up @@ -180,93 +172,93 @@ trait CromwellApiService extends HttpInstrumentation with MetadataRouteSupport w
case Failure(e) => e.failRequest(StatusCodes.InternalServerError)
}
}
}

object CromwellApiService {
}

import spray.json._
object CromwellApiService {
import spray.json._

/**
* Sends a request to abort the workflow. Provides configurable success & error handlers to allow
* for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes
* and error messages
*/
def abortWorkflow(possibleWorkflowId: String,
workflowStoreActor: ActorRef,
workflowManagerActor: ActorRef,
successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler,
errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler)
(implicit timeout: Timeout): Route = {
handleExceptions(ExceptionHandler(errorHandler)) {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(workflowId) =>
val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse]
onComplete(response) {
case Success(x: SuccessfulAbortResponse) => successHandler(x)
case Success(x: WorkflowAbortFailureResponse) => throw x.failure
case Failure(e) => throw e
}
case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId)
}
/**
* Sends a request to abort the workflow. Provides configurable success & error handlers to allow
* for different API endpoints to provide different effects in the appropriate situations, e.g. HTTP codes
* and error messages
*/
def abortWorkflow(possibleWorkflowId: String,
workflowStoreActor: ActorRef,
workflowManagerActor: ActorRef,
successHandler: PartialFunction[SuccessfulAbortResponse, Route] = standardAbortSuccessHandler,
errorHandler: PartialFunction[Throwable, Route] = standardAbortErrorHandler)
(implicit timeout: Timeout): Route = {
handleExceptions(ExceptionHandler(errorHandler)) {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(workflowId) =>
val response = workflowStoreActor.ask(WorkflowStoreActor.AbortWorkflowCommand(workflowId)).mapTo[AbortResponse]
onComplete(response) {
case Success(x: SuccessfulAbortResponse) => successHandler(x)
case Success(x: WorkflowAbortFailureResponse) => throw x.failure
case Failure(e) => throw e
}
case Failure(_) => throw InvalidWorkflowException(possibleWorkflowId)
}
}
}

/**
* The abort success handler for typical cases, i.e. cromwell's API.
*/
private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = {
case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString)))
case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString)))
}
/**
* The abort success handler for typical cases, i.e. cromwell's API.
*/
private def standardAbortSuccessHandler: PartialFunction[SuccessfulAbortResponse, Route] = {
case WorkflowAbortedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborted.toString)))
case WorkflowAbortRequestedResponse(id) => complete(ToResponseMarshallable(WorkflowAbortResponse(id.toString, WorkflowAborting.toString)))
}

/**
* The abort error handler for typical cases, i.e. cromwell's API
*/
private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = {
case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest)
case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound)
case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable)
case e: Exception => e.errorRequest(StatusCodes.InternalServerError)
}
/**
* The abort error handler for typical cases, i.e. cromwell's API
*/
private def standardAbortErrorHandler: PartialFunction[Throwable, Route] = {
case e: InvalidWorkflowException => e.failRequest(StatusCodes.BadRequest)
case e: WorkflowNotFoundException => e.errorRequest(StatusCodes.NotFound)
case _: AskTimeoutException if CromwellShutdown.shutdownInProgress() => serviceShuttingDownResponse
case e: TimeoutException => e.failRequest(StatusCodes.ServiceUnavailable)
case e: Exception => e.errorRequest(StatusCodes.InternalServerError)
}

def validateWorkflowIdInMetadata(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap {
case RecognizedWorkflowId => Future.successful(w)
case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)
case FailedToCheckWorkflowId(t) => Future.failed(t)
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
def validateWorkflowIdInMetadata(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadata(w)).mapTo[WorkflowValidationResponse] flatMap {
case RecognizedWorkflowId => Future.successful(w)
case UnrecognizedWorkflowId => validateWorkflowIdInMetadataSummaries(possibleWorkflowId, serviceRegistryActor)
case FailedToCheckWorkflowId(t) => Future.failed(t)
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
}

def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map {
case RecognizedWorkflowId => w
case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w)
case FailedToCheckWorkflowId(t) => throw t
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
def validateWorkflowIdInMetadataSummaries(possibleWorkflowId: String,
serviceRegistryActor: ActorRef)
(implicit timeout: Timeout, executor: ExecutionContext): Future[WorkflowId] = {
Try(WorkflowId.fromString(possibleWorkflowId)) match {
case Success(w) =>
serviceRegistryActor.ask(ValidateWorkflowIdInMetadataSummaries(w)).mapTo[WorkflowValidationResponse] map {
case RecognizedWorkflowId => w
case UnrecognizedWorkflowId => throw UnrecognizedWorkflowException(w)
case FailedToCheckWorkflowId(t) => throw t
}
case Failure(_) => Future.failed(InvalidWorkflowException(possibleWorkflowId))
}
}

final case class BackendResponse(supportedBackends: List[String], defaultBackend: String)
final case class BackendResponse(supportedBackends: List[String], defaultBackend: String)

final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id")
final case class UnrecognizedWorkflowException(id: WorkflowId) extends Exception(s"Unrecognized workflow ID: $id")

final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.")
final case class InvalidWorkflowException(possibleWorkflowId: String) extends Exception(s"Invalid workflow ID: '$possibleWorkflowId'.")

val cromwellVersion = VersionUtil.getVersion("cromwell-engine")
val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi"))
val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name)
val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson))
val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable)
}
val cromwellVersion = VersionUtil.getVersion("cromwell-engine")
val swaggerUiVersion = VersionUtil.getVersion("swagger-ui", VersionUtil.sbtDependencyVersion("swaggerUi"))
val backendResponse = BackendResponse(BackendConfiguration.AllBackendEntries.map(_.name).sorted, BackendConfiguration.DefaultBackendEntry.name)
val versionResponse = JsObject(Map("cromwell" -> cromwellVersion.toJson))
val serviceShuttingDownResponse = new Exception("Cromwell service is shutting down.").failRequest(StatusCodes.ServiceUnavailable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ trait WesCromwellRouteSupport extends WebServiceUtils {
implicit val timeout: Timeout = duration

implicit def actorRefFactory: ActorRefFactory

implicit val materializer: ActorMaterializer
implicit val ec: ExecutionContext

Expand Down
Loading