Skip to content

Commit

Permalink
Resolve settings from Akka stream attributes akka#3253
Browse files Browse the repository at this point in the history
  • Loading branch information
sfali committed Aug 24, 2024
1 parent f1de209 commit 426b800
Showing 1 changed file with 25 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import akka.http.scaladsl.model.{
Uri
}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import akka.stream.{Attributes, Materializer}
import akka.stream.alpakka.azure.storage.impl.auth.Signer
import akka.stream.scaladsl.{Flow, RetryFlow, Source}
import akka.util.ByteString
Expand All @@ -50,9 +50,9 @@ object AzureStorageStream {
versionId: Option[String],
leaseId: Option[String]): Source[ByteString, Future[ObjectMetadata]] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.GET,
Expand Down Expand Up @@ -86,10 +86,10 @@ object AzureStorageStream {
versionId: Option[String],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
import mat.executionContext
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.HEAD,
Expand Down Expand Up @@ -119,10 +119,10 @@ object AzureStorageStream {
versionId: Option[String],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
import mat.executionContext
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val request =
createRequest(
method = HttpMethods.DELETE,
Expand Down Expand Up @@ -154,9 +154,9 @@ object AzureStorageStream {
payload: Source[ByteString, _],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val httpEntity = HttpEntity(contentType, contentLength, payload)
val request =
createRequest(
Expand All @@ -176,9 +176,9 @@ object AzureStorageStream {
maxSize: Long,
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val request =
createRequest(
HttpMethods.PUT,
Expand All @@ -202,9 +202,9 @@ object AzureStorageStream {
payload: Option[Source[ByteString, _]],
leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed.type] = {
Source
.fromMaterializer { (mat, _) =>
.fromMaterializer { (mat, attr) =>
implicit val system: ActorSystem = mat.system
val settings = StorageSettings(system)
val settings = resolveSettings(attr, system)
val contentLength = range.last - range.first + 1
val clearRange = payload.isEmpty
val writeType = if (clearRange) "clear" else "update"
Expand Down Expand Up @@ -365,6 +365,18 @@ object AzureStorageStream {
queryString = queryString)
}

private def resolveSettings(attr: Attributes, sys: ActorSystem) =
attr
.get[StorageSettingsValue]
.map(_.settings)
.getOrElse {
val storageExtension = StorageExt(sys)
attr
.get[StorageSettingsPath]
.map(settingsPath => storageExtension.settings(settingsPath.path))
.getOrElse(storageExtension.settings)
}

// `Content-Type` header is by design not accessible as header. So need to have a custom
// header implementation to expose that
private case class CustomContentTypeHeader(contentType: ContentType) extends CustomHeader {
Expand Down

0 comments on commit 426b800

Please sign in to comment.