Skip to content

Commit

Permalink
updated implementation to get headers from dsl akka#3253
Browse files Browse the repository at this point in the history
1. DSL to create headers based on function and pass it to implementation
2. Remove "bobType" from "putBlob", separate functions will be introduced for page and append blocks
  • Loading branch information
sfali committed Aug 28, 2024
1 parent 49deaae commit cf48d55
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,17 @@ import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes.{Accepted, Created, NotFound, OK}
import akka.http.scaladsl.model.headers.{
ByteRange,
CustomHeader,
RawHeader,
`Content-Length`,
`Content-Type`,
Range => RangeHeader
}
import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`, CustomHeader}
import akka.http.scaladsl.model.{
ContentType,
HttpEntity,
HttpHeader,
HttpMethod,
HttpMethods,
HttpRequest,
HttpResponse,
ResponseEntity,
StatusCode,
UniversalEntity,
Uri
}
import akka.http.scaladsl.unmarshalling.Unmarshal
Expand All @@ -46,9 +39,8 @@ object AzureStorageStream {

private[storage] def getObject(storageType: String,
objectPath: String,
range: Option[ByteRange],
versionId: Option[String],
leaseId: Option[String]): Source[ByteString, Future[ObjectMetadata]] = {
headers: Seq[HttpHeader]): Source[ByteString, Future[ObjectMetadata]] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
Expand All @@ -60,7 +52,7 @@ object AzureStorageStream {
storageType = storageType,
objectPath = objectPath,
queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))),
headers = populateCommonHeaders(HttpEntity.Empty, range = range, leaseId = leaseId)
headers = headers
)
val objectMetadataMat = Promise[ObjectMetadata]()
signAndRequest(request, settings)(mat.system)
Expand All @@ -84,7 +76,7 @@ object AzureStorageStream {
private[storage] def getObjectProperties(storageType: String,
objectPath: String,
versionId: Option[String],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
Expand All @@ -97,7 +89,7 @@ object AzureStorageStream {
storageType = storageType,
objectPath = objectPath,
queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))),
headers = populateCommonHeaders(HttpEntity.Empty, leaseId = leaseId)
headers = headers
)

signAndRequest(request, settings)
Expand All @@ -117,7 +109,7 @@ object AzureStorageStream {
private[storage] def deleteObject(storageType: String,
objectPath: String,
versionId: Option[String],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
Expand All @@ -130,7 +122,7 @@ object AzureStorageStream {
storageType = storageType,
objectPath = objectPath,
queryString = createQueryString(settings, versionId.map(value => s"versionId=$value"))),
headers = populateCommonHeaders(HttpEntity.Empty, leaseId = leaseId)
headers = headers
)

signAndRequest(request, settings)
Expand All @@ -147,35 +139,29 @@ object AzureStorageStream {
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def putBlob(blobType: String,
objectPath: String,
contentType: ContentType,
contentLength: Long,
payload: Source[ByteString, _],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
private[storage] def putBlob(objectPath: String,
httpEntity: UniversalEntity,
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = resolveSettings(attr, system)
val httpEntity = HttpEntity(contentType, contentLength, payload)
val request =
createRequest(
method = HttpMethods.PUT,
uri = createUri(settings = settings,
storageType = BlobType,
objectPath = objectPath,
queryString = createQueryString(settings)),
headers = populateCommonHeaders(httpEntity, blobType = Some(blobType), leaseId = leaseId)
headers = headers
).withEntity(httpEntity)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def createFile(objectPath: String,
contentType: ContentType,
maxSize: Long,
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
Expand All @@ -187,50 +173,54 @@ object AzureStorageStream {
storageType = FileType,
objectPath = objectPath,
queryString = createQueryString(settings)),
headers = Seq(
CustomContentTypeHeader(contentType),
RawHeader(XMsContentLengthHeaderKey, maxSize.toString),
RawHeader(FileTypeHeaderKey, "file")
) ++ leaseId.map(value => RawHeader(LeaseIdHeaderKey, value))
headers = headers
)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def updateOrClearRange(objectPath: String,
contentType: ContentType,
range: ByteRange.Slice,
payload: Option[Source[ByteString, _]],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
private[storage] def updateRange(objectPath: String,
httpEntity: UniversalEntity,
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = resolveSettings(attr, system)
val contentLength = range.last - range.first + 1
val clearRange = payload.isEmpty
val writeType = if (clearRange) "clear" else "update"
val overrideContentLength = if (clearRange) Some(0L) else None
val httpEntity =
if (clearRange) HttpEntity.empty(contentType) else HttpEntity(contentType, contentLength, payload.get)
val request =
createRequest(
method = HttpMethods.PUT,
uri = createUri(settings = settings,
storageType = FileType,
objectPath = objectPath,
queryString = createQueryString(settings, Some("comp=range"))),
headers = populateCommonHeaders(httpEntity,
overrideContentLength,
range = Some(range),
leaseId = leaseId,
writeType = Some(writeType))
headers = headers
).withEntity(httpEntity)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def clearRange(objectPath: String,
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.PUT,
uri = createUri(settings = settings,
storageType = FileType,
objectPath = objectPath,
queryString = createQueryString(settings, Some("comp=range"))),
headers = headers
)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def createContainer(objectPath: String): Source[Option[ObjectMetadata], NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
Expand All @@ -243,7 +233,7 @@ object AzureStorageStream {
storageType = BlobType,
objectPath = objectPath,
queryString = createQueryString(settings, Some("restype=container"))),
headers = populateCommonHeaders(HttpEntity.Empty)
headers = StorageHeaders().headers
)

handlePutRequest(request, settings)
Expand Down Expand Up @@ -345,32 +335,6 @@ object AzureStorageStream {
}
}

private def populateCommonHeaders(entity: HttpEntity,
overrideContentLength: Option[Long] = None,
range: Option[ByteRange] = None,
blobType: Option[String] = None,
leaseId: Option[String] = None,
writeType: Option[String] = None) = {
// Azure required to have these two headers (Content-Length & Content-Type) in the request
// in some cases Content-Length header must be set as 0
val contentLength = overrideContentLength.orElse(entity.contentLengthOption).getOrElse(0L)
val maybeContentLengthHeader =
if (overrideContentLength.isEmpty && contentLength == 0L) None else Some(CustomContentLengthHeader(contentLength))

val maybeContentTypeHeader =
emptyStringToOption(entity.contentType.toString()) match {
case Some(value) if value != "none/none" => Some(CustomContentTypeHeader(entity.contentType))
case _ => None
}

val maybeRangeHeader = range.map(RangeHeader(_))
val maybeBlobTypeHeader = blobType.map(value => RawHeader(BlobTypeHeaderKey, value))
val maybeLeaseIdHeader = leaseId.map(value => RawHeader(LeaseIdHeaderKey, value))
val maybeWriteTypeHeader = writeType.map(value => RawHeader(FileWriteTypeHeaderKey, value))

(maybeContentLengthHeader ++ maybeContentTypeHeader ++ maybeRangeHeader ++ maybeBlobTypeHeader ++ maybeLeaseIdHeader ++ maybeWriteTypeHeader).toSeq
}

private def createRequest(method: HttpMethod, uri: Uri, headers: Seq[HttpHeader]) =
HttpRequest(method = method, uri = uri, headers = headers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import akka.NotUsed
import akka.http.javadsl.model._
import akka.http.javadsl.model.headers.ByteRange
import akka.http.scaladsl.model.headers.{ByteRange => ScalaByteRange}
import akka.http.scaladsl.model.{ContentType => ScalaContentType}
import akka.http.scaladsl.model.{HttpEntity, ContentType => ScalaContentType}
import akka.stream.alpakka.azure.storage.headers.BlobTypeHeader
import akka.stream.alpakka.azure.storage.impl.AzureStorageStream
import akka.stream.javadsl.Source
import akka.stream.scaladsl.SourceToCompletionStage
Expand All @@ -38,14 +39,21 @@ object BlobService {
def getBlob(objectPath: String,
range: ByteRange,
versionId: Optional[String],
leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] = {
val scalaRange = range.asInstanceOf[ScalaByteRange]
leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] =
new Source(
AzureStorageStream
.getObject(BlobType, objectPath, Some(scalaRange), Option(versionId.orElse(null)), Option(leaseId.orElse(null)))
.getObject(
BlobType,
objectPath,
Option(versionId.orElse(null)),
StorageHeaders
.create()
.withRangeHeader(range.asInstanceOf[ScalaByteRange])
.withLeaseIdHeader(Option(leaseId.orElse(null)))
.headers
)
.toCompletionStage()
)
}

/**
* Gets blob representing `objectPath` with specified range (if applicable).
Expand All @@ -61,7 +69,15 @@ object BlobService {
leaseId: Optional[String]): Source[ByteString, CompletionStage[ObjectMetadata]] =
new Source(
AzureStorageStream
.getObject(BlobType, objectPath, None, Option(versionId.orElse(null)), Option(leaseId.orElse(null)))
.getObject(
BlobType,
objectPath,
Option(versionId.orElse(null)),
StorageHeaders
.create()
.withLeaseIdHeader(Option(leaseId.orElse(null)))
.headers
)
.toCompletionStage()
)

Expand All @@ -78,7 +94,10 @@ object BlobService {
versionId: Optional[String],
leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream
.getObjectProperties(BlobType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null)))
.getObjectProperties(BlobType,
objectPath,
Option(versionId.orElse(null)),
StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

Expand All @@ -95,7 +114,10 @@ object BlobService {
versionId: Optional[String],
leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream
.deleteObject(BlobType, objectPath, Option(versionId.orElse(null)), Option(leaseId.orElse(null)))
.deleteObject(BlobType,
objectPath,
Option(versionId.orElse(null)),
StorageHeaders.create().withLeaseIdHeader(Option(leaseId.orElse(null))).headers)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

Expand All @@ -106,23 +128,26 @@ object BlobService {
* @param contentType content type of the blob
* @param contentLength length of the blob
* @param payload actual payload, a [[akka.stream.javadsl.Source Source]] of [[akka.util.ByteString ByteString]]
* @param blobType type of the blob, ''Must be one of:'' __'''BlockBlob, PageBlob, or AppendBlob'''__
* @return A [[akka.stream.javadsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def putBlob(objectPath: String,
contentType: ContentType,
contentLength: Long,
payload: Source[ByteString, _],
blobType: String = "BlockBlob",
leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream
.putBlob(blobType,
objectPath,
contentType.asInstanceOf[ScalaContentType],
contentLength,
payload.asScala,
Option(leaseId.orElse(null)))
.putBlob(
objectPath,
HttpEntity(contentType.asInstanceOf[ScalaContentType], contentLength, payload.asScala),
StorageHeaders
.create()
.withContentLengthHeader(contentLength)
.withContentTypeHeader(contentType.asInstanceOf[ScalaContentType])
.withLeaseIdHeader(Option(leaseId.orElse(null)))
.withBlobTypeHeader(BlobTypeHeader.BlockBlobHeader)
.headers
)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

Expand Down
Loading

0 comments on commit cf48d55

Please sign in to comment.