Skip to content

Commit

Permalink
functions to put (create) page and append block akka#3253
Browse files Browse the repository at this point in the history
  • Loading branch information
sfali committed Aug 28, 2024
1 parent c10b3d1 commit 96601a8
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ private[storage] class StorageHeaders private (val contentLengthHeader: Option[H
private[storage] def withPageBlobContentLengthHeader(contentLength: Long): StorageHeaders =
copy(pageBlobContentLengthHeader = Some(RawHeader(PageBlobContentLengthHeaderKey, contentLength.toString)))

private[storage] def withPageBlobSequenceNumberHeader(sequenceNumber: Int): StorageHeaders =
copy(pageBlobSequenceNumberHeader = Some(RawHeader(PageBlobSequenceNumberHeaderKey, sequenceNumber.toString)))
private[storage] def withPageBlobSequenceNumberHeader(sequenceNumber: Option[Int]): StorageHeaders =
copy(
pageBlobSequenceNumberHeader =
sequenceNumber.map(value => RawHeader(PageBlobSequenceNumberHeaderKey, value.toString))
)

private def copy(contentLengthHeader: Option[HttpHeader] = contentLengthHeader,
contentTypeHeader: Option[HttpHeader] = contentTypeHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ object AzureStorageStream {
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def putPageOrAppendBlock(objectPath: String, headers: Seq[HttpHeader]) = {
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.PUT,
uri = createUri(settings = settings,
storageType = BlobType,
objectPath = objectPath,
queryString = createQueryString(settings)),
headers = headers
)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def createFile(objectPath: String,
headers: Seq[HttpHeader]): Source[Option[ObjectMetadata], NotUsed] = {
Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,64 @@ object BlobService {
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

/**
* Put (Create) Page Blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
* @param maxBlockSize maximum block size
* @param blobSequenceNumber optional block sequence number
* @param leaseId lease ID of an active lease (if applicable)
* @return A [[akka.stream.javadsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def putPageBlock(objectPath: String,
contentType: ContentType,
maxBlockSize: Long,
blobSequenceNumber: Option[Int],
leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed.type] =
AzureStorageStream
.putPageOrAppendBlock(
objectPath,
StorageHeaders
.create()
.withContentLengthHeader(0L)
.withContentTypeHeader(contentType.asInstanceOf[ScalaContentType])
.withBlobTypeHeader(BlobTypeHeader.PageBlobHeader)
.withPageBlobContentLengthHeader(maxBlockSize)
.withPageBlobSequenceNumberHeader(blobSequenceNumber)
.withLeaseIdHeader(Option(leaseId.orElse(null)))
.headers
)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

/**
* Put (Create) Append Blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
* @param leaseId lease ID of an active lease (if applicable)
* @return A [[akka.stream.javadsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def putAppendBlock(objectPath: String,
contentType: ContentType,
leaseId: Optional[String]): Source[Optional[ObjectMetadata], NotUsed.type] =
AzureStorageStream
.putPageOrAppendBlock(
objectPath,
StorageHeaders
.create()
.withContentLengthHeader(0L)
.withContentTypeHeader(contentType.asInstanceOf[ScalaContentType])
.withBlobTypeHeader(BlobTypeHeader.AppendBlobHeader)
.withLeaseIdHeader(Option(leaseId.orElse(null)))
.headers
)
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

/**
* Create container.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ object BlobService {
* @param contentType content type of the blob
* @param contentLength length of the blob
* @param payload actual payload, a [[akka.stream.scaladsl.Source Source]] of [[akka.util.ByteString ByteString]]
* @param leaseId lease ID of an active lease (if applicable)
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
Expand All @@ -101,6 +102,56 @@ object BlobService {
.headers
)

/**
* Put (Create) Page Blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
* @param maxBlockSize maximum block size
* @param blobSequenceNumber optional block sequence number
* @param leaseId lease ID of an active lease (if applicable)
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def putPageBlock(objectPath: String,
contentType: ContentType = ContentTypes.`application/octet-stream`,
maxBlockSize: Long,
blobSequenceNumber: Option[Int] = None,
leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.putPageOrAppendBlock(
objectPath,
StorageHeaders()
.withContentLengthHeader(0L)
.withContentTypeHeader(contentType)
.withBlobTypeHeader(BlobTypeHeader.PageBlobHeader)
.withPageBlobContentLengthHeader(maxBlockSize)
.withPageBlobSequenceNumberHeader(blobSequenceNumber)
.withLeaseIdHeader(leaseId)
.headers
)

/**
* Put (Create) Append Blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
* @param leaseId lease ID of an active lease (if applicable)
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def putAppendBlock(objectPath: String,
contentType: ContentType = ContentTypes.`application/octet-stream`,
leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.putPageOrAppendBlock(
objectPath,
StorageHeaders()
.withContentLengthHeader(0L)
.withContentTypeHeader(contentType)
.withBlobTypeHeader(BlobTypeHeader.AppendBlobHeader)
.withLeaseIdHeader(leaseId)
.headers
)

/**
* Create container.
*
Expand Down

0 comments on commit 96601a8

Please sign in to comment.