diff --git a/CHANGELOG.md b/CHANGELOG.md index 0084c05171b..b1190ef951a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Cromwell Change Log +## 86 Release Notes + +### HTTPFilesystem Improvements + + * WDL `size` engine function now works for HTTP files. + ## 85 Release Notes ### Migration of PKs to BIGINT diff --git a/build.sbt b/build.sbt index edb6d2811e4..3754a92305d 100644 --- a/build.sbt +++ b/build.sbt @@ -249,6 +249,7 @@ lazy val engine = project .dependsOn(backend) .dependsOn(gcsFileSystem) .dependsOn(drsFileSystem) + .dependsOn(httpFileSystem) .dependsOn(sraFileSystem) .dependsOn(awsS3FileSystem) .dependsOn(azureBlobFileSystem) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4ecdf1ce233..b341682f1db 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -15,6 +15,15 @@ webservice { } akka { + + http { + client { + parsing { + illegal-header-warnings = off + } + } + } + actor.default-dispatcher.fork-join-executor { # Number of threads = min(parallelism-factor * cpus, parallelism-max) # Below are the default values set by Akka, uncomment to tune these diff --git a/engine/src/main/scala/cromwell/engine/io/IoActor.scala b/engine/src/main/scala/cromwell/engine/io/IoActor.scala index 40a6f81c4aa..b4b3a0b191f 100644 --- a/engine/src/main/scala/cromwell/engine/io/IoActor.scala +++ b/engine/src/main/scala/cromwell/engine/io/IoActor.scala @@ -1,7 +1,7 @@ package cromwell.engine.io import akka.NotUsed -import akka.actor.{Actor, ActorLogging, ActorRef, Props, Timers} +import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Timers} import akka.dispatch.ControlMessage import akka.stream._ import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, Sink, Source, SourceQueueWithComplete} @@ -40,6 +40,7 @@ final class IoActor(ioConfig: IoConfig, applicationName: String)(implicit val materializer: ActorMaterializer) extends Actor with ActorLogging with StreamActorHelper[IoCommandContext[_]] with IoInstrumentation with Timers { implicit val ec: ExecutionContext = context.dispatcher + implicit val system: ActorSystem = context.system // IntelliJ disapproves of mutable state in Actors, but this should be safe as long as access occurs only in // the `receive` method. Alternatively IntelliJ does suggest a `become` workaround we might try in the future. diff --git a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala index 012e771b5a0..b6ce3ee7cc2 100644 --- a/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala +++ b/engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala @@ -1,7 +1,8 @@ package cromwell.engine.io.nio +import akka.actor.ActorSystem import akka.stream.scaladsl.Flow -import cats.effect.{IO, Timer} +import cats.effect._ import scala.util.Try import cloud.nio.spi.{ChecksumFailure, ChecksumResult, ChecksumSkipped, ChecksumSuccess, FileHash, HashType} @@ -15,6 +16,7 @@ import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackp import cromwell.filesystems.blob.BlobPath import cromwell.filesystems.drs.DrsPath import cromwell.filesystems.gcs.GcsPath +import cromwell.filesystems.http.HttpPath import cromwell.filesystems.s3.S3Path import cromwell.util.TryWithResource._ import net.ceedubs.ficus.Ficus._ @@ -34,9 +36,11 @@ class NioFlow(parallelism: Int, onBackpressure: Option[Double] => Unit, numberOfAttempts: Int, commandBackpressureStaleness: FiniteDuration - )(implicit ec: ExecutionContext) extends IoCommandStalenessBackpressuring { + )(implicit system: ActorSystem) extends IoCommandStalenessBackpressuring { + implicit private val ec: ExecutionContext = system.dispatcher implicit private val timer: Timer[IO] = IO.timer(ec) + implicit private val contextShift: ContextShift[IO] = IO.contextShift(ec) override def maxStaleness: FiniteDuration = commandBackpressureStaleness @@ -161,9 +165,11 @@ class NioFlow(parallelism: Int, fileContentIo.map(_.replaceAll("\\r\\n", "\\\n")) } - private def size(size: IoSizeCommand) = IO { - size.file.size - } + private def size(size: IoSizeCommand) = + size.file match { + case httpPath: HttpPath => IO.fromFuture(IO(httpPath.fetchSize)) + case nioPath => IO(nioPath.size) + } private def hash(hash: IoHashCommand): IO[String] = { // If there is no hash accessible from the file storage system, diff --git a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala index b01d52eece0..41c121da0cc 100644 --- a/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala @@ -21,6 +21,7 @@ import org.scalatest.flatspec.AsyncFlatSpecLike import org.scalatest.matchers.should.Matchers import common.mock.MockSugar import cromwell.filesystems.blob.BlobPath +import cromwell.filesystems.http.HttpPathBuilder import java.nio.file.NoSuchFileException import java.util.UUID @@ -42,7 +43,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with onRetryCallback = NoopOnRetry, onBackpressure = NoopOnBackpressure, numberOfAttempts = 3, - commandBackpressureStaleness = 5 seconds)(system.dispatcher).flow + commandBackpressureStaleness = 5 seconds)(system).flow implicit val materializer: ActorMaterializer = ActorMaterializer() private val replyTo = mock[ActorRef] @@ -96,6 +97,34 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with } } + it should "fail with an UnknownHost error when trying to get size for a bogus HTTP path" in { + val httpPath = new HttpPathBuilder().build("http://ex000mple.c0m/bogus/url/fake.html").get + + val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo) + val testSource = Source.single(context) + + val stream = testSource.via(flow).toMat(readSink)(Keep.right) + stream.run() map { + case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) => + receivedException.getMessage should include ("UnknownHost") + case (ack, _) => fail(s"size should have failed with UnknownHost but didn't:\n$ack\n\n") + } + } + + it should "fail when trying to get size for a bogus HTTP path" in { + val httpPath = new HttpPathBuilder().build("http://google.com/bogus/je8934hufe832489uihewuihf").get + + val context = DefaultCommandContext(sizeCommand(httpPath).get, replyTo) + val testSource = Source.single(context) + + val stream = testSource.via(flow).toMat(readSink)(Keep.right) + stream.run() map { + case (IoFailure(_, EnhancedCromwellIoException(_, receivedException)), _) => + receivedException.getMessage should include ("Couldn't fetch size") + case (ack, _) => fail(s"size should have failed but didn't:\n$ack\n\n") + } + } + it should "get hash from a Nio Path" in { val testPath = DefaultPathBuilder.createTempFile() testPath.write("hello") @@ -304,7 +333,7 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with onRetryCallback = NoopOnRetry, onBackpressure = NoopOnBackpressure, numberOfAttempts = 3, - commandBackpressureStaleness = 5 seconds)(system.dispatcher) { + commandBackpressureStaleness = 5 seconds)(system) { private var tries = 0 override def handleSingleCommand(ioSingleCommand: IoCommand[_]): IO[IoSuccess[_]] = { diff --git a/filesystems/http/src/main/scala/cromwell/filesystems/http/HttpPathBuilder.scala b/filesystems/http/src/main/scala/cromwell/filesystems/http/HttpPathBuilder.scala index d1c3f19ad61..89bb22021df 100644 --- a/filesystems/http/src/main/scala/cromwell/filesystems/http/HttpPathBuilder.scala +++ b/filesystems/http/src/main/scala/cromwell/filesystems/http/HttpPathBuilder.scala @@ -3,7 +3,7 @@ import java.nio.file.Paths import akka.actor.{ActorContext, ActorSystem} import akka.http.scaladsl.Http -import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.{HttpMethods, HttpRequest} import akka.stream.scaladsl.{FileIO, Keep} import akka.stream.{ActorAttributes, ActorMaterializer} import cromwell.core.Dispatcher @@ -53,4 +53,20 @@ case class HttpPath(nioPath: NioPath) extends Path { override def pathAsString: String = nioPath.toString.replaceFirst("/", "//") override def pathWithoutScheme: String = pathAsString.replaceFirst("http[s]?://", "") + + def fetchSize(implicit executionContext: ExecutionContext, actorSystem: ActorSystem): Future[Long] = { + Http().singleRequest(HttpRequest(uri = pathAsString, method = HttpMethods.HEAD)).map { response => + response.discardEntityBytes() + val length = if (response.status.isSuccess()) + response.entity.contentLengthOption + else + None + length.getOrElse( + throw new RuntimeException( + s"Couldn't fetch size for $pathAsString, missing Content-Length header or path doesn't exist (HTTP ${response.status.toString()})." + ) + ) + } + } + }