Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3AttachmentStore #3779

Merged
merged 14 commits into from
Jul 27, 2018
7 changes: 7 additions & 0 deletions common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ dependencies {
compile 'io.reactivex:rxscala_2.11:0.26.5'
compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
compile 'com.microsoft.azure:azure-cosmosdb:2.0.0'

compile ('com.lightbend.akka:akka-stream-alpakka-s3_2.11:0.19') {
exclude group: 'commons-logging'
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses akka-http
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'com.fasterxml.jackson.dataformat'
}
scoverage gradle.scoverage.deps
}

Expand Down
2 changes: 2 additions & 0 deletions common/scala/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.

include "s3-reference.conf"

whisk.spi {
ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider
ActivationStoreProvider = whisk.core.entity.ArtifactActivationStoreProvider
Expand Down
78 changes: 78 additions & 0 deletions common/scala/src/main/resources/s3-reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Licensed to the Apache Software Foundation (ASF) under one or more contributor
# license agreements; and to You under the Apache License, Version 2.0.

whisk {
s3 {
# See https://developer.lightbend.com/docs/alpakka/current/s3.html#usage
alpakka {
# whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
buffer = "memory"

# location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
disk-buffer-path = ""

proxy {
# hostname of the proxy. If undefined ("") proxy is not enabled.
host = ""
port = 8000

# if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
secure = true
}

# default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
# these values will be used.
aws {
# If this section is absent, the fallback behavior is to use the
# com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
credentials {
# supported providers:
# anon - anonymous requests ("no auth")
# static - static credentials,
# required params:
# access-key-id
# secret-access-key
# optional:
# token
# default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
# attempts to get the credentials from either:
# - environment variables
# - system properties
# - credentials file
# - EC2 credentials service
# - IAM / metadata
provider = default
}

# If this section is absent, the fallback behavior is to use the
# com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain instance to resolve region
region {
# supported providers:
# static - static credentials,
# required params:
# default-region
# default: as described in com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain docs,
# attempts to get the region from either:
# - environment variables
# - system properties
# - progile file
# - EC2 metadata
provider = default
}
}

# Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
# Default is virtual-hosted style.
# When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
# Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
path-style-access = true

# Custom endpoint url, used for alternate s3 implementations
# endpoint-url = null

# Which version of the list bucket api to use. Set to 1 to use the old style version 1 API.
# By default the newer version 2 api is used.
list-bucket-api-version = 2
}
}
}
2 changes: 2 additions & 0 deletions common/scala/src/main/scala/whisk/core/WhiskConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,6 @@ object ConfigKeys {
val containerProxy = "whisk.container-proxy"
val containerProxyTimeouts = s"$containerProxy.timeouts"

val s3 = "whisk.s3"

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ trait AttachmentSupport[DocumentAbstraction <: DocumentSerializer] extends Defau
protected[database] def uriOf(bytesOrSource: Either[ByteString, Source[ByteString, _]], path: => String): Uri = {
bytesOrSource match {
case Left(bytes) => Uri.from(scheme = MemScheme, path = encode(bytes))
case Right(_) => Uri.from(scheme = attachmentScheme, path = path)
case Right(_) => uriFrom(scheme = attachmentScheme, path = path)
}
}

//Not using Uri.from due to https://github.com/akka/akka-http/issues/2080
protected[database] def uriFrom(scheme: String, path: String): Uri = Uri(s"$scheme:$path")

/**
* Constructs a source from inlined attachment contents
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[(DocInfo, Attached)] = {

if (maxInlineSize.toBytes == 0) {
val uri = Uri.from(scheme = attachmentScheme, path = UUID().asString)
val uri = uriFrom(scheme = attachmentScheme, path = UUID().asString)
for {
attached <- Future.successful(Attached(uri.toString, contentType))
i1 <- put(update(doc, attached))
Expand Down Expand Up @@ -549,7 +549,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
*/
private def getAttachmentName(name: String): String = {
Try(java.util.UUID.fromString(name))
.map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
.map(_ => uriFrom(scheme = attachmentScheme, path = name).toString)
.getOrElse(name)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package whisk.core.database.s3

import akka.actor.ActorSystem
import akka.http.scaladsl.model.ContentType
import akka.stream.ActorMaterializer
import akka.stream.alpakka.s3.scaladsl.S3Client
import akka.stream.alpakka.s3.{S3Exception, S3Settings}
import akka.stream.scaladsl.{Sink, Source}
import akka.util.ByteString
import com.typesafe.config.Config
import pureconfig.loadConfigOrThrow
import whisk.common.LoggingMarkers.{DATABASE_ATTS_DELETE, DATABASE_ATT_DELETE, DATABASE_ATT_GET, DATABASE_ATT_SAVE}
import whisk.common.{Logging, TransactionId}
import whisk.core.ConfigKeys
import whisk.core.database.StoreUtils._
import whisk.core.database._
import whisk.core.entity.DocId

import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.ClassTag

object S3AttachmentStoreProvider extends AttachmentStoreProvider {
val alpakkaConfigKey = s"${ConfigKeys.s3}.alpakka"
case class S3Config(bucket: String) {
def prefixFor[D](implicit tag: ClassTag[D]): String = {
tag.runtimeClass.getSimpleName.toLowerCase
}
}

override def makeStore[D <: DocumentSerializer: ClassTag]()(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val client = new S3Client(S3Settings(alpakkaConfigKey))
val config = loadConfigOrThrow[S3Config](ConfigKeys.s3)
new S3AttachmentStore(client, config.bucket, config.prefixFor[D])
}

def makeStore[D <: DocumentSerializer: ClassTag](config: Config)(implicit actorSystem: ActorSystem,
logging: Logging,
materializer: ActorMaterializer): AttachmentStore = {
val client = new S3Client(S3Settings(config, alpakkaConfigKey))
val s3config = loadConfigOrThrow[S3Config](config, ConfigKeys.s3)
new S3AttachmentStore(client, s3config.bucket, s3config.prefixFor[D])
}

}
class S3AttachmentStore(client: S3Client, bucket: String, prefix: String)(implicit system: ActorSystem,
logging: Logging,
materializer: ActorMaterializer)
extends AttachmentStore {
override val scheme = "s3"

override protected[core] implicit val executionContext: ExecutionContext = system.dispatcher

override protected[core] def attach(
docId: DocId,
name: String,
contentType: ContentType,
docStream: Source[ByteString, _])(implicit transid: TransactionId): Future[AttachResult] = {
require(name != null, "name undefined")
val start =
transid.started(this, DATABASE_ATT_SAVE, s"[ATT_PUT] uploading attachment '$name' of document 'id: $docId'")

//A possible optimization for small attachments < 5MB can be to use putObject instead of multipartUpload
//and thus use 1 remote call instead of 3
val f = docStream
.runWith(combinedSink(client.multipartUpload(bucket, objectKey(docId, name), contentType)))
.map(r => AttachResult(r.digest, r.length))

f.onSuccess({
case _ =>
transid
.finished(this, start, s"[ATT_PUT] '$prefix' completed uploading attachment '$name' of document 'id: $docId'")
})

reportFailure(
f,
start,
failure => s"[ATT_PUT] '$prefix' internal error, name: '$name', doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def readAttachment[T](docId: DocId, name: String, sink: Sink[ByteString, Future[T]])(
implicit transid: TransactionId): Future[T] = {
require(name != null, "name undefined")
val start =
transid.started(
this,
DATABASE_ATT_GET,
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
val (source, _) = client.download(bucket, objectKey(docId, name))

val f = source.runWith(sink)

val g = f.transform(
{ s =>
transid
.finished(this, start, s"[ATT_GET] '$prefix' completed: found attachment '$name' of document 'id: $docId'")
s
}, {
case s: Throwable if isMissingKeyException(s) =>
transid
.finished(
this,
start,
s"[ATT_GET] '$prefix', retrieving attachment '$name' of document 'id: $docId'; not found.")
NoDocumentException("Not found on 'readAttachment'.")
case e => e
})

reportFailure(
g,
start,
failure =>
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachments(docId: DocId)(implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATTS_DELETE, s"[ATT_DELETE] deleting attachments of document 'id: $docId'")

//S3 provides API to delete multiple objects in single call however alpakka client
//currently does not support that and also in current usage 1 docs has at most 1 attachment
//so current approach would also involve 2 remote calls
val f = client
.listBucket(bucket, Some(objectKeyPrefix(docId)))
.mapAsync(1)(bc => client.deleteObject(bc.bucketName, bc.key))
.runWith(Sink.seq)
.map(_ => true)

f.onSuccess {
case _ =>
transid.finished(this, start, s"[ATTS_DELETE] completed: deleting attachments of document 'id: $docId'")
}

reportFailure(
f,
start,
failure => s"[ATTS_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override protected[core] def deleteAttachment(docId: DocId, name: String)(
implicit transid: TransactionId): Future[Boolean] = {
val start =
transid.started(this, DATABASE_ATT_DELETE, s"[ATT_DELETE] deleting attachment '$name' of document 'id: $docId'")

val f = client
.deleteObject(bucket, objectKey(docId, name))
.map(_ => true)

f.onSuccess {
case _ =>
transid.finished(this, start, s"[ATT_DELETE] completed: deleting attachment '$name' of document 'id: $docId'")
}

reportFailure(
f,
start,
failure => s"[ATT_DELETE] '$prefix' internal error, doc: '$docId', failure: '${failure.getMessage}'")
}

override def shutdown(): Unit = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing to shutdown?

Copy link
Member Author

@chetanmeh chetanmeh Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses Akka cached Http connection pool which is bound to ActorSystem lifecycle so need not be explicitly closed. It may be possible to close the pool but comments here indicates that it may pose problem so not attempting them


private def objectKey(id: DocId, name: String): String = s"$prefix/${id.id}/$name"

private def objectKeyPrefix(id: DocId): String = s"$prefix/${id.id}"

private def isMissingKeyException(e: Throwable): Boolean = {
//In some case S3Exception is a sub cause. So need to recurse
e match {
case s: S3Exception if s.code == "NoSuchKey" => true
case t if t != null && isMissingKeyException(t.getCause) => true
case _ => false
}
}
}
2 changes: 2 additions & 0 deletions tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ dependencies {
compile 'io.opentracing:opentracing-mock:0.31.0'
compile "org.apache.curator:curator-test:${gradle.curator.version}"

compile "com.amazonaws:aws-java-sdk-s3:1.11.295"

compile project(':common:scala')
compile project(':core:controller')
compile project(':core:invoker')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object ActionContainer {
}.get // This fails if the docker binary couldn't be located.
}

private lazy val dockerCmd: String = {
lazy val dockerCmd: String = {
/*
* The docker host is set to a provided property 'docker.host' if it's
* available; otherwise we check with WhiskProperties to see whether we are
Expand Down
Loading