diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala index 9b5c59c6be..19e3182d53 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala @@ -11,12 +11,31 @@ import akka.http.scaladsl.model.{ContentTypes, RequestEntity, _} import akka.stream.alpakka.s3.S3Settings import akka.stream.scaladsl.Source import akka.util.ByteString - import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future} private[alpakka] object HttpRequests { + def listBucket( + bucket: String, + region: String, + prefix: Option[String] = None, + continuationToken: Option[String] = None + )(implicit conf: S3Settings): HttpRequest = { + + val query = Query( + Seq( + "list-type" -> Some("2"), + "prefix" -> prefix, + "continuation-token" -> continuationToken.map(_.replaceAll("=", "%3D")) + ).collect { case (k, Some(v)) => k -> v }.toMap + ) + + HttpRequest(HttpMethods.GET) + .withHeaders(Host(requestHost(bucket, region))) + .withUri(requestUri(bucket, None, region).withQuery(query)) + } + def getDownloadRequest(s3Location: S3Location, region: String)(implicit conf: S3Settings): HttpRequest = s3Request(s3Location, region: String) @@ -69,42 +88,41 @@ private[alpakka] object HttpRequests { private[this] def s3Request(s3Location: S3Location, region: String, method: HttpMethod = HttpMethods.GET, - uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest = { - - def requestHost(s3Location: S3Location, region: String)(implicit conf: S3Settings): Uri.Host = - conf.proxy match { - case None => - region match { - case "us-east-1" => - if (conf.pathStyleAccess) { - Uri.Host("s3.amazonaws.com") - } else { - Uri.Host(s"${s3Location.bucket}.s3.amazonaws.com") - } - case _ => - if (conf.pathStyleAccess) { - Uri.Host(s"s3-$region.amazonaws.com") - } else { - Uri.Host(s"${s3Location.bucket}.s3-$region.amazonaws.com") - } - } - case Some(proxy) => Uri.Host(proxy.host) - } + uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest = + HttpRequest(method) + .withHeaders(Host(requestHost(s3Location.bucket, region))) + .withUri(uriFn(requestUri(s3Location.bucket, Some(s3Location.key), region))) - def requestUri(s3Location: S3Location, region: String)(implicit conf: S3Settings): Uri = { - val uri = if (conf.pathStyleAccess) { - Uri(s"/${s3Location.bucket}/${s3Location.key}").withHost(requestHost(s3Location, region)) - } else { - Uri(s"/${s3Location.key}").withHost(requestHost(s3Location, region)) - } - conf.proxy match { - case None => uri.withScheme("https") - case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme) - } + private[this] def requestHost(bucket: String, region: String)(implicit conf: S3Settings): Uri.Host = + conf.proxy match { + case None => + region match { + case "us-east-1" => + if (conf.pathStyleAccess) { + Uri.Host("s3.amazonaws.com") + } else { + Uri.Host(s"$bucket.s3.amazonaws.com") + } + case _ => + if (conf.pathStyleAccess) { + Uri.Host(s"s3-$region.amazonaws.com") + } else { + Uri.Host(s"$bucket.s3-$region.amazonaws.com") + } + } + case Some(proxy) => Uri.Host(proxy.host) } - HttpRequest(method) - .withHeaders(Host(requestHost(s3Location, region))) - .withUri(uriFn(requestUri(s3Location, region))) + private[this] def requestUri(bucket: String, key: Option[String], region: String)(implicit conf: S3Settings): Uri = { + val uri = if (conf.pathStyleAccess) { + Uri(s"/${bucket}${key.fold("")((someKey) => s"/$someKey")}") + .withHost(requestHost(bucket, region)) + } else { + Uri(s"${key.fold("")((someKey) => s"/$someKey")}").withHost(requestHost(bucket, region)) + } + conf.proxy match { + case None => uri.withScheme("https") + case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme) + } } } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala index c6da99d6ef..79d0a26ffe 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala @@ -32,4 +32,20 @@ private[alpakka] object Marshalling { ) } } + + val isTruncated = "IsTruncated" + val continuationToken = "NextContinuationToken" + val key = "Key" + + implicit val listBucketResultUnmarshaller: FromEntityUnmarshaller[ListBucketResult] = { + nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map { + case NodeSeq.Empty => throw Unmarshaller.NoContentException + case x => + ListBucketResult( + (x \ isTruncated).text == "true", + if ((x \ continuationToken).isEmpty) None else Some((x \ continuationToken).text), + (x \\ key).toSeq.map(_.text) + ) + } + } } diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala index 958d5413cf..fbe7a4606e 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.s3.impl import java.nio.file.Paths import java.time.LocalDate - import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http @@ -18,7 +17,6 @@ import akka.stream.alpakka.s3.auth.{AWSCredentials, CredentialScope, Signer, Sig import akka.stream.alpakka.s3.{DiskBufferType, MemoryBufferType, S3Exception, S3Settings} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.util.ByteString - import scala.collection.immutable.Seq import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -43,6 +41,8 @@ final case class FailedUpload(reasons: Seq[Throwable]) extends Exception final case class CompleteMultipartUploadResult(location: Uri, bucket: String, key: String, etag: String) +final case class ListBucketResult(isTruncated: Boolean, continuationToken: Option[String], keys: Seq[String]) + object S3Stream { def apply(credentials: AWSCredentials, region: String)(implicit system: ActorSystem, mat: Materializer): S3Stream = @@ -65,6 +65,32 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials, Source.fromFuture(request(s3Location, range).flatMap(entityForSuccess).map(_.dataBytes)).flatMapConcat(identity) } + def listBucket(bucket: String, prefix: Option[String] = None): Source[String, NotUsed] = { + sealed trait ListBucketState + case object Starting extends ListBucketState + case class Running(continuationToken: String) extends ListBucketState + case object Finished extends ListBucketState + + import system.dispatcher + + def listBucketCall(token: Option[String]): Future[Option[(ListBucketState, Seq[String])]] = + signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, region, prefix, token)) + .map { (res: ListBucketResult) => + Some( + res.continuationToken + .fold[(ListBucketState, Seq[String])]((Finished, res.keys))(t => (Running(t), res.keys)) + ) + } + + Source + .unfoldAsync[ListBucketState, Seq[String]](Starting) { + case Finished => Future.successful(None) + case Starting => listBucketCall(None) + case Running(token) => listBucketCall(Some(token)) + } + .mapConcat(identity) + } + def request(s3Location: S3Location, rangeOption: Option[ByteRange] = None): Future[HttpResponse] = { val downloadRequest = getDownloadRequest(s3Location, region) signAndGet(rangeOption match { diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala index a4a4507741..a29bd41829 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/javadsl/S3Client.scala @@ -42,6 +42,9 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS impl.download(S3Location(bucket, key), Some(scalaRange)).asJava } + def listBucket(bucket: String, prefix: Option[String]): Source[String, NotUsed] = + impl.listBucket(bucket, prefix).asJava + def multipartUpload(bucket: String, key: String, contentType: ContentType, diff --git a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala index a5140e28d1..02731e45a4 100644 --- a/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala +++ b/s3/src/main/scala/akka/stream/alpakka/s3/scaladsl/S3Client.scala @@ -46,6 +46,8 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste def download(bucket: String, key: String, range: ByteRange): Source[ByteString, NotUsed] = impl.download(S3Location(bucket, key), Some(range)) + def listBucket(bucket: String, prefix: Option[String]): Source[String, NotUsed] = impl.listBucket(bucket, prefix) + def multipartUpload(bucket: String, key: String, contentType: ContentType = ContentTypes.`application/octet-stream`, diff --git a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala index ee1d012116..1874563946 100644 --- a/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala +++ b/s3/src/test/scala/akka/stream/alpakka/s3/impl/HttpRequestsSpec.scala @@ -4,6 +4,7 @@ package akka.stream.alpakka.s3.impl import akka.actor.ActorSystem +import akka.http.scaladsl.model.Uri.Query import akka.http.scaladsl.model.headers.RawHeader import akka.http.scaladsl.model.{HttpEntity, MediaTypes} import akka.stream.alpakka.s3.S3Settings @@ -208,8 +209,29 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures { it should "initiate multipart upload with custom s3 headers" in { implicit val settings = S3Settings(ActorSystem()) val s3Headers = S3Headers(Map("Cache-Control" -> "no-cache")) - val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) + val req = HttpRequests + .initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers) req.headers should contain(RawHeader("Cache-Control", "no-cache")) } + + it should "properly construct the list bucket request with no prefix or continuation token passed" in { + implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig)) + + val req = + HttpRequests.listBucket(location.bucket, "region") + + req.uri.query() shouldEqual Query("list-type" -> "2") + } + + it should "properly construct the list bucket request with a prefix and token passed" in { + implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig)) + + val req = + HttpRequests.listBucket(location.bucket, "region", Some("random/prefix"), Some("randomToken")) + + req.uri.query() shouldEqual Query("list-type" -> "2", + "prefix" -> "random/prefix", + "continuation-token" -> "randomToken") + } }