diff --git a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala index b0fa5fafca..fc1e02bc68 100644 --- a/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala +++ b/azure-storage/src/main/scala/akka/stream/alpakka/azure/storage/impl/AzureStorageStream.scala @@ -33,7 +33,7 @@ import akka.http.scaladsl.model.{ Uri } import akka.http.scaladsl.unmarshalling.Unmarshal -import akka.stream.Materializer +import akka.stream.{Attributes, Materializer} import akka.stream.alpakka.azure.storage.impl.auth.Signer import akka.stream.scaladsl.{Flow, RetryFlow, Source} import akka.util.ByteString @@ -50,9 +50,9 @@ object AzureStorageStream { versionId: Option[String], leaseId: Option[String]): Source[ByteString, Future[ObjectMetadata]] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val request = createRequest( method = HttpMethods.GET, @@ -86,10 +86,10 @@ object AzureStorageStream { versionId: Option[String], leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system import mat.executionContext - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val request = createRequest( method = HttpMethods.HEAD, @@ -119,10 +119,10 @@ object AzureStorageStream { versionId: Option[String], leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system import mat.executionContext - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val request = createRequest( method = HttpMethods.DELETE, @@ -154,9 +154,9 @@ object AzureStorageStream { payload: Source[ByteString, _], leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val httpEntity = HttpEntity(contentType, contentLength, payload) val request = createRequest( @@ -176,9 +176,9 @@ object AzureStorageStream { maxSize: Long, leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val request = createRequest( HttpMethods.PUT, @@ -202,9 +202,9 @@ object AzureStorageStream { payload: Option[Source[ByteString, _]], leaseId: Option[String]): Source[Option[ObjectMetadata], NotUsed.type] = { Source - .fromMaterializer { (mat, _) => + .fromMaterializer { (mat, attr) => implicit val system: ActorSystem = mat.system - val settings = StorageSettings(system) + val settings = resolveSettings(attr, system) val contentLength = range.last - range.first + 1 val clearRange = payload.isEmpty val writeType = if (clearRange) "clear" else "update" @@ -365,6 +365,18 @@ object AzureStorageStream { queryString = queryString) } + private def resolveSettings(attr: Attributes, sys: ActorSystem) = + attr + .get[StorageSettingsValue] + .map(_.settings) + .getOrElse { + val storageExtension = StorageExt(sys) + attr + .get[StorageSettingsPath] + .map(settingsPath => storageExtension.settings(settingsPath.path)) + .getOrElse(storageExtension.settings) + } + // `Content-Type` header is by design not accessible as header. So need to have a custom // header implementation to expose that private case class CustomContentTypeHeader(contentType: ContentType) extends CustomHeader {