Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1092 Support size engine function for public HTTP files #7128

Merged
merged 15 commits into from
May 23, 2023
Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Cromwell Change Log

## 86 Release Notes

### HTTPFilesystem Improvements

* WDL `size` engine function now works for HTTP files.

## 85 Release Notes

### Migration of PKs to BIGINT
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ lazy val engine = project
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(drsFileSystem)
.dependsOn(httpFileSystem)
.dependsOn(sraFileSystem)
.dependsOn(awsS3FileSystem)
.dependsOn(azureBlobFileSystem)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ webservice {
}

akka {

http {
client {
parsing {
illegal-header-warnings = off
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suppresses warnings about the etag header when reading blob files. I turned these warnings off entirely because they don't seem useful to anyone and are more log clutter. If anyone objects to the big hammer, this can be changed to suppress warnings about specific headers (ex. just etag) or can be added just to our Azure config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach makes sense to me.

}
}
}

actor.default-dispatcher.fork-join-executor {
# Number of threads = min(parallelism-factor * cpus, parallelism-max)
# Below are the default values set by Akka, uncomment to tune these
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/scala/cromwell/engine/io/IoActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cromwell.engine.io

import akka.NotUsed
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers}
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Timers}
import akka.dispatch.ControlMessage
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, Sink, Source, SourceQueueWithComplete}
Expand Down Expand Up @@ -40,6 +40,7 @@ final class IoActor(ioConfig: IoConfig,
applicationName: String)(implicit val materializer: ActorMaterializer)
extends Actor with ActorLogging with StreamActorHelper[IoCommandContext[_]] with IoInstrumentation with Timers {
implicit val ec: ExecutionContext = context.dispatcher
implicit val system: ActorSystem = context.system

// IntelliJ disapproves of mutable state in Actors, but this should be safe as long as access occurs only in
// the `receive` method. Alternatively IntelliJ does suggest a `become` workaround we might try in the future.
Expand Down
16 changes: 11 additions & 5 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package cromwell.engine.io.nio

import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
import cats.effect.{IO, Timer}
import cats.effect._

import scala.util.Try
import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType}
Expand All @@ -15,6 +16,7 @@ import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackp
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.http.HttpPath
import cromwell.filesystems.s3.S3Path
import cromwell.util.TryWithResource._
import net.ceedubs.ficus.Ficus._
Expand All @@ -34,9 +36,11 @@ class NioFlow(parallelism: Int,
onBackpressure: Option[Double] => Unit,
numberOfAttempts: Int,
commandBackpressureStaleness: FiniteDuration
)(implicit ec: ExecutionContext) extends IoCommandStalenessBackpressuring {
)(implicit system: ActorSystem) extends IoCommandStalenessBackpressuring {

implicit private val ec: ExecutionContext = system.dispatcher
implicit private val timer: Timer[IO] = IO.timer(ec)
implicit private val contextShift: ContextShift[IO] = IO.contextShift(ec)

override def maxStaleness: FiniteDuration = commandBackpressureStaleness

Expand Down Expand Up @@ -161,9 +165,11 @@ class NioFlow(parallelism: Int,
fileContentIo.map(_.replaceAll("\\r\\n", "\\\n"))
}

private def size(size: IoSizeCommand) = IO {
size.file.size
}
private def size(size: IoSizeCommand) =
size.file match {
case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize))
case nioPath => IO(nioPath.size)
}

private def hash(hash: IoHashCommand): IO[String] = {
// If there is no hash accessible from the file storage system,
Expand Down
33 changes: 31 additions & 2 deletions engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalatest.flatspec.AsyncFlatSpecLike
import org.scalatest.matchers.should.Matchers
import common.mock.MockSugar
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.http.HttpPathBuilder

import java.nio.file.NoSuchFileException
import java.util.UUID
Expand All @@ -42,7 +43,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
onRetryCallback = NoopOnRetry,
onBackpressure = NoopOnBackpressure,
numberOfAttempts = 3,
commandBackpressureStaleness = 5 seconds)(system.dispatcher).flow
commandBackpressureStaleness = 5 seconds)(system).flow

implicit val materializer: ActorMaterializer = ActorMaterializer()
private val replyTo = mock[ActorRef]
Expand Down Expand Up @@ -96,6 +97,34 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
}
}

it should "fail with an UnknownHost error when trying to get size for a bogus HTTP path" in {
val httpPath = new HttpPathBuilder().build("http://ex000mple.c0m/bogus/url/fake.html").get

val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo)
val testSource = Source.single(context)

val stream = testSource.via(flow).toMat(readSink)(Keep.right)
stream.run() map {
case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) =>
receivedException.getMessage should include ("UnknownHost")
case (ack, _) => fail(s"size should have failed with UnknownHost but didn't:\n$ack\n\n")
}
}

it should "fail when trying to get size for a bogus HTTP path" in {
val httpPath = new HttpPathBuilder().build("http://google.com/bogus/je8934hufe832489uihewuihf").get

val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo)
val testSource = Source.single(context)

val stream = testSource.via(flow).toMat(readSink)(Keep.right)
stream.run() map {
case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) =>
receivedException.getMessage should include ("Couldn't fetch size")
case (ack, _) => fail(s"size should have failed but didn't:\n$ack\n\n")
}
}

it should "get hash from a Nio Path" in {
val testPath = DefaultPathBuilder.createTempFile()
testPath.write("hello")
Expand Down Expand Up @@ -304,7 +333,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
onRetryCallback = NoopOnRetry,
onBackpressure = NoopOnBackpressure,
numberOfAttempts = 3,
commandBackpressureStaleness = 5 seconds)(system.dispatcher) {
commandBackpressureStaleness = 5 seconds)(system) {

private var tries = 0
override def handleSingleCommand(ioSingleCommand: IoCommand[_]): IO[IoSuccess[_]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import java.nio.file.Paths

import akka.actor.{ActorContext, ActorSystem}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.{HttpMethods, HttpRequest}
import akka.stream.scaladsl.{FileIO, Keep}
import akka.stream.{ActorAttributes, ActorMaterializer}
import cromwell.core.Dispatcher
Expand Down Expand Up @@ -53,4 +53,20 @@ case class HttpPath(nioPath: NioPath) extends Path {
override def pathAsString: String = nioPath.toString.replaceFirst("/", "//")

override def pathWithoutScheme: String = pathAsString.replaceFirst("http[s]?://", "")

def fetchSize(implicit executionContext: ExecutionContext, actorSystem: ActorSystem): Future[Long] = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't directly override size because it's final in BetterFileMethods. I ended up deciding I like this separate method better anyway - it's a totally different operation than reading a file attribute, so I think it's helpful that it needs to be called deliberately and the caller needs to handle the Future.

Http().singleRequest(HttpRequest(uri = pathAsString, method = HttpMethods.HEAD)).map { response =>
response.discardEntityBytes()
val length = if (response.status.isSuccess())
response.entity.contentLengthOption
else
None
length.getOrElse(
throw new RuntimeException(
s"Couldn't fetch size for $pathAsString, missing Content-Length header or path doesn't exist (HTTP ${response.status.toString()})."
)
)
}
}

}