Skip to content

Commit

Permalink
support for create container operation akka#3253
Browse files Browse the repository at this point in the history
  • Loading branch information
sfali committed Aug 25, 2024
1 parent 07e64a5 commit 44c2f84
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,24 @@ object AzureStorageStream {
.mapMaterializedValue(_ => NotUsed)
}

private[storage] def createContainer(objectPath: String): Source[Option[ObjectMetadata], NotUsed] =
Source
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.PUT,
accountName = settings.azureNameKeyCredential.accountName,
storageType = BlobType,
objectPath = objectPath,
queryString = createQueryString(settings, Some("restype=container")),
headers = populateCommonHeaders(HttpEntity.Empty)
)
handlePutRequest(request, settings)
}
.mapMaterializedValue(_ => NotUsed)

private def handlePutRequest(request: HttpRequest, settings: StorageSettings)(implicit system: ActorSystem) = {
import system.dispatcher
signAndRequest(request, settings).flatMapConcat {
Expand Down Expand Up @@ -330,7 +348,7 @@ object AzureStorageStream {
overrideContentLength: Option[Long] = None,
range: Option[ByteRange] = None,
blobType: Option[String] = None,
leaseId: Option[String],
leaseId: Option[String] = None,
writeType: Option[String] = None) = {
// Azure required to have these two headers (Content-Length & Content-Type) in the request
// in some cases Content-Length header must be set as 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ object BlobService {
.asJava

/**
* Put blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
Expand All @@ -124,4 +125,14 @@ object BlobService {
Option(leaseId.orElse(null)))
.map(opt => Optional.ofNullable(opt.orNull))
.asJava

/**
* Create container.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def createContainer(objectPath: String): Source[Optional[ObjectMetadata], NotUsed] =
AzureStorageStream.createContainer(objectPath).map(opt => Optional.ofNullable(opt.orNull)).asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ object BlobService {
AzureStorageStream.deleteObject(BlobType, objectPath, versionId, leaseId)

/**
* Put blob.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @param contentType content type of the blob
Expand All @@ -82,4 +83,14 @@ object BlobService {
blobType: String = "BlockBlob",
leaseId: Option[String] = None): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.putBlob(blobType, objectPath, contentType, contentLength, payload, leaseId)

/**
* Create container.
*
* @param objectPath path of the object, should start with "/" and separated by `/`, e.g. `/container/blob`
* @return A [[akka.stream.scaladsl.Source Source]] containing an [[scala.Option]] of
* [[akka.stream.alpakka.azure.storage.ObjectMetadata]], will be [[scala.None]] in case the object does not exist
*/
def createContainer(objectPath: String): Source[Option[ObjectMetadata], NotUsed] =
AzureStorageStream.createContainer(objectPath)
}

0 comments on commit 44c2f84

Please sign in to comment.