Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added list bucket method to S3 library. #253

Merged
merged 12 commits into from
May 10, 2017
Merged
88 changes: 53 additions & 35 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/HttpRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,31 @@ import akka.http.scaladsl.model.{ContentTypes, RequestEntity, _}
import akka.stream.alpakka.s3.S3Settings
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}

private[alpakka] object HttpRequests {

def listBucket(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it wouldn't make sense to use separate methods for the initial call and the continuation. It seems that you either would want to add a prefix or a token both never both?

bucket: String,
region: String,
prefix: Option[String] = None,
continuationToken: Option[String] = None
)(implicit conf: S3Settings): HttpRequest = {

val query = Query(
Seq(
"list-type" -> Some("2"),
"prefix" -> prefix,
"continuation-token" -> continuationToken.map(_.replaceAll("=", "%3D"))
).collect { case (k, Some(v)) => k -> v }.toMap
)

HttpRequest(HttpMethods.GET)
.withHeaders(Host(requestHost(bucket, region)))
.withUri(requestUri(bucket, None, region).withQuery(query))
}

def getDownloadRequest(s3Location: S3Location, region: String)(implicit conf: S3Settings): HttpRequest =
s3Request(s3Location, region: String)

Expand Down Expand Up @@ -69,42 +88,41 @@ private[alpakka] object HttpRequests {
private[this] def s3Request(s3Location: S3Location,
region: String,
method: HttpMethod = HttpMethods.GET,
uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest = {

def requestHost(s3Location: S3Location, region: String)(implicit conf: S3Settings): Uri.Host =
conf.proxy match {
case None =>
region match {
case "us-east-1" =>
if (conf.pathStyleAccess) {
Uri.Host("s3.amazonaws.com")
} else {
Uri.Host(s"${s3Location.bucket}.s3.amazonaws.com")
}
case _ =>
if (conf.pathStyleAccess) {
Uri.Host(s"s3-$region.amazonaws.com")
} else {
Uri.Host(s"${s3Location.bucket}.s3-$region.amazonaws.com")
}
}
case Some(proxy) => Uri.Host(proxy.host)
}
uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest =
HttpRequest(method)
.withHeaders(Host(requestHost(s3Location.bucket, region)))
.withUri(uriFn(requestUri(s3Location.bucket, Some(s3Location.key), region)))

def requestUri(s3Location: S3Location, region: String)(implicit conf: S3Settings): Uri = {
val uri = if (conf.pathStyleAccess) {
Uri(s"/${s3Location.bucket}/${s3Location.key}").withHost(requestHost(s3Location, region))
} else {
Uri(s"/${s3Location.key}").withHost(requestHost(s3Location, region))
}
conf.proxy match {
case None => uri.withScheme("https")
case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme)
}
private[this] def requestHost(bucket: String, region: String)(implicit conf: S3Settings): Uri.Host =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, to put extract that method.

conf.proxy match {
case None =>
region match {
case "us-east-1" =>
if (conf.pathStyleAccess) {
Uri.Host("s3.amazonaws.com")
} else {
Uri.Host(s"$bucket.s3.amazonaws.com")
}
case _ =>
if (conf.pathStyleAccess) {
Uri.Host(s"s3-$region.amazonaws.com")
} else {
Uri.Host(s"$bucket.s3-$region.amazonaws.com")
}
}
case Some(proxy) => Uri.Host(proxy.host)
}

HttpRequest(method)
.withHeaders(Host(requestHost(s3Location, region)))
.withUri(uriFn(requestUri(s3Location, region)))
private[this] def requestUri(bucket: String, key: Option[String], region: String)(implicit conf: S3Settings): Uri = {
val uri = if (conf.pathStyleAccess) {
Uri(s"/${bucket}${key.fold("")((someKey) => s"/$someKey")}")
.withHost(requestHost(bucket, region))
} else {
Uri(s"${key.fold("")((someKey) => s"/$someKey")}").withHost(requestHost(bucket, region))
}
conf.proxy match {
case None => uri.withScheme("https")
case Some(proxy) => uri.withPort(proxy.port).withScheme(proxy.scheme)
}
}
}
16 changes: 16 additions & 0 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/Marshalling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,20 @@ private[alpakka] object Marshalling {
)
}
}

val isTruncated = "IsTruncated"
val continuationToken = "NextContinuationToken"
val key = "Key"

implicit val listBucketResultUnmarshaller: FromEntityUnmarshaller[ListBucketResult] = {
nodeSeqUnmarshaller(MediaTypes.`application/xml` withCharset HttpCharsets.`UTF-8`).map {
case NodeSeq.Empty => throw Unmarshaller.NoContentException
case x =>
ListBucketResult(
(x \ isTruncated).text == "true",
if ((x \ continuationToken).isEmpty) None else Some((x \ continuationToken).text),
(x \\ key).toSeq.map(_.text)
)
}
}
}
30 changes: 28 additions & 2 deletions s3/src/main/scala/akka/stream/alpakka/s3/impl/S3Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package akka.stream.alpakka.s3.impl

import java.nio.file.Paths
import java.time.LocalDate

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
Expand All @@ -18,7 +17,6 @@ import akka.stream.alpakka.s3.auth.{AWSCredentials, CredentialScope, Signer, Sig
import akka.stream.alpakka.s3.{DiskBufferType, MemoryBufferType, S3Exception, S3Settings}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
Expand All @@ -43,6 +41,8 @@ final case class FailedUpload(reasons: Seq[Throwable]) extends Exception

final case class CompleteMultipartUploadResult(location: Uri, bucket: String, key: String, etag: String)

final case class ListBucketResult(isTruncated: Boolean, continuationToken: Option[String], keys: Seq[String])

object S3Stream {

def apply(credentials: AWSCredentials, region: String)(implicit system: ActorSystem, mat: Materializer): S3Stream =
Expand All @@ -65,6 +65,32 @@ private[alpakka] final class S3Stream(credentials: AWSCredentials,
Source.fromFuture(request(s3Location, range).flatMap(entityForSuccess).map(_.dataBytes)).flatMapConcat(identity)
}

def listBucket(bucket: String, prefix: Option[String] = None): Source[String, NotUsed] = {
sealed trait ListBucketState
case object Starting extends ListBucketState
case class Running(continuationToken: String) extends ListBucketState
case object Finished extends ListBucketState

import system.dispatcher

def listBucketCall(token: Option[String]): Future[Option[(ListBucketState, Seq[String])]] =
signAndGetAs[ListBucketResult](HttpRequests.listBucket(bucket, region, prefix, token))
.map { (res: ListBucketResult) =>
Some(
res.continuationToken
.fold[(ListBucketState, Seq[String])]((Finished, res.keys))(t => (Running(t), res.keys))
)
}

Source
.unfoldAsync[ListBucketState, Seq[String]](Starting) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is great now.

case Finished => Future.successful(None)
case Starting => listBucketCall(None)
case Running(token) => listBucketCall(Some(token))
}
.mapConcat(identity)
}

def request(s3Location: S3Location, rangeOption: Option[ByteRange] = None): Future[HttpResponse] = {
val downloadRequest = getDownloadRequest(s3Location, region)
signAndGet(rangeOption match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ final class S3Client(credentials: AWSCredentials, region: String, system: ActorS
impl.download(S3Location(bucket, key), Some(scalaRange)).asJava
}

def listBucket(bucket: String, prefix: Option[String]): Source[String, NotUsed] =
impl.listBucket(bucket, prefix).asJava

def multipartUpload(bucket: String,
key: String,
contentType: ContentType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ final class S3Client(credentials: AWSCredentials, region: String)(implicit syste
def download(bucket: String, key: String, range: ByteRange): Source[ByteString, NotUsed] =
impl.download(S3Location(bucket, key), Some(range))

def listBucket(bucket: String, prefix: Option[String]): Source[String, NotUsed] = impl.listBucket(bucket, prefix)

def multipartUpload(bucket: String,
key: String,
contentType: ContentType = ContentTypes.`application/octet-stream`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package akka.stream.alpakka.s3.impl

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri.Query
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.{HttpEntity, MediaTypes}
import akka.stream.alpakka.s3.S3Settings
Expand Down Expand Up @@ -208,8 +209,29 @@ class HttpRequestsSpec extends FlatSpec with Matchers with ScalaFutures {
it should "initiate multipart upload with custom s3 headers" in {
implicit val settings = S3Settings(ActorSystem())
val s3Headers = S3Headers(Map("Cache-Control" -> "no-cache"))
val req = HttpRequests.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers)
val req = HttpRequests
.initiateMultipartUploadRequest(location, contentType, "us-east-2", s3Headers)

req.headers should contain(RawHeader("Cache-Control", "no-cache"))
}

it should "properly construct the list bucket request with no prefix or continuation token passed" in {
implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig))

val req =
HttpRequests.listBucket(location.bucket, "region")

req.uri.query() shouldEqual Query("list-type" -> "2")
}

it should "properly construct the list bucket request with a prefix and token passed" in {
implicit val settings = S3Settings(ConfigFactory.parseString(pathStyleAcessConfig))

val req =
HttpRequests.listBucket(location.bucket, "region", Some("random/prefix"), Some("randomToken"))

req.uri.query() shouldEqual Query("list-type" -> "2",
"prefix" -> "random/prefix",
"continuation-token" -> "randomToken")
}
}