From 90be7d3e27bd6041bd8dbc567d9885b5a79dbbc6 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Mon, 15 Aug 2022 14:09:48 -0400 Subject: [PATCH 01/16] BT-711 Refresh SAS token for filesystem on expiry --- .../filesystems/blob/BlobPathBuilder.scala | 52 +++++++++++-------- .../blob/BlobPathBuilderFactory.scala | 14 ++--- .../blob/BlobPathBuilderSpec.scala | 13 +++-- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 69a21c90eda..279b78f0306 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -7,7 +7,7 @@ import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ import java.net.{MalformedURLException, URI} -import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} +import java.nio.file._ import scala.jdk.CollectionConverters._ import scala.language.postfixOps import scala.util.{Failure, Try} @@ -57,35 +57,43 @@ object BlobPathBuilder { class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder { - val credential: AzureSasCredential = new AzureSasCredential(blobTokenGenerator.getAccessToken) - val fileSystemConfig: Map[String, Object] = Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), - (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), - (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) - - def retrieveFilesystem(uri: URI): Try[FileSystem] = { - Try(FileSystems.getFileSystem(uri)) recover { - // If no filesystem already exists, this will create a new connection, with the provided configs - case _: FileSystemNotFoundException => FileSystems.newFileSystem(uri, fileSystemConfig.asJava) - } - } - def build(string: String): Try[BlobPath] = { validateBlobPath(string, container, endpoint) match { - case ValidBlobPath(path) => for { - fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint)) - nioPath <- Try(fileSystem.getPath(path)) - blobPath = BlobPath(nioPath, endpoint, container) - } yield blobPath + case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, blobTokenGenerator)) case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage) } } - override def name: String = "Azure Blob Storage" } -// Add args for container, storage account name -case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container: String) extends Path { - override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container) +object BlobPath { + def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { + Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), + (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) + } + + def findNioPath(path: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator): NioPath = (for { + fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint), container, blobTokenGenerator) + nioPath <- Try(fileSystem.getPath(path)) + } yield nioPath).get // Ideally we would unwrap this to a NioPath on success and on a access failure try to recover + + def retrieveFilesystem(uri: URI, container: String, blobTokenGenerator: BlobTokenGenerator): Try[FileSystem] = { + Try(FileSystems.getFileSystem(uri)) recover { + // If no filesystem already exists, this will create a new connection, with the provided configs + case _: FileSystemNotFoundException => { + val fileSystemConfig = buildConfigMap(blobTokenGenerator.getAccessToken, container) + FileSystems.newFileSystem(uri, fileSystemConfig.asJava) + } + } + } +} +case class BlobPath private[blob](pathString: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator) extends Path { + //var token = blobTokenGenerator.getAccessToken + //var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) + override def nioPath: NioPath = BlobPath.findNioPath(path = pathString, endpoint, container, blobTokenGenerator) + + override protected def newPath(nioPath: NioPath): Path = BlobPath(pathString, endpoint, container, blobTokenGenerator) override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index cea7269522a..32187a3a280 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -1,6 +1,7 @@ package cromwell.filesystems.blob import akka.actor.ActorSystem +import com.azure.core.credential.AzureSasCredential import com.azure.core.management.AzureEnvironment import com.azure.core.management.profile.AzureProfile import com.azure.identity.DefaultAzureCredentialBuilder @@ -14,8 +15,7 @@ import cromwell.core.path.PathBuilderFactory import net.ceedubs.ficus.Ficus._ import java.time.OffsetDateTime -import scala.concurrent.ExecutionContext -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ final case class BlobFileSystemConfig(config: Config) @@ -37,7 +37,7 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co } sealed trait BlobTokenGenerator { - def getAccessToken: String + def getAccessToken: AzureSasCredential } object BlobTokenGenerator { @@ -57,13 +57,13 @@ object BlobTokenGenerator { } case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator { - def getAccessToken: String = { + def getAccessToken: AzureSasCredential = { throw new NotImplementedError } } case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator { - def getAccessToken: String = { + def getAccessToken: AzureSasCredential = { val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match { case Some(storageAccountName) => storageAccountName case _ => throw new Exception("Storage account could not be parsed from endpoint") @@ -73,7 +73,7 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends val azureCredential = new DefaultAzureCredentialBuilder() .authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint) .build - val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription + val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription() val storageAccounts = azure.storageAccounts() val storageAccount = storageAccounts @@ -110,6 +110,6 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends blobContainerSasPermission ) - blobContainerClient.generateSas(blobServiceSasSignatureValues) + new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues)) } } diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 69cec235aff..e3e4bc1c2bb 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -1,7 +1,7 @@ package cromwell.filesystems.blob import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import java.nio.file.Files + object BlobPathBuilderSpec { def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net" @@ -42,7 +42,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ } } - ignore should "build a blob path from a test string and read a file" in { + it should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost val store = "inputs" @@ -55,20 +55,23 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ blobPath.endpoint should equal(endpoint) blobPath.pathAsString should equal(testString) blobPath.pathWithoutScheme should equal(endpointHost + "/" + store + evalPath) - - val is = Files.newInputStream(blobPath.nioPath) + val is = blobPath.newInputStream() val fileText = (is.readAllBytes.map(_.toChar)).mkString fileText should include ("This is my test file!!!! Did it work?") } - ignore should "build duplicate blob paths in the same filesystem" in { + it should "build duplicate blob paths in the same filesystem" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = "inputs" val evalPath = "/test/inputFile.txt" val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) val testString = endpoint + "/" + store + evalPath val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + blobPath1.nioPath.getFileSystem().close() val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() blobPath1 should equal(blobPath2) + val is = blobPath1.newInputStream() + val fileText = (is.readAllBytes.map(_.toChar)).mkString + fileText should include ("This is my test file!!!! Did it work?") } } From 77dcf196e6e50065b3c787ce3fc984c807e6e988 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Mon, 22 Aug 2022 16:11:17 -0400 Subject: [PATCH 02/16] Rough cut of token refresh using exceptions --- .../filesystems/blob/BlobPathBuilder.scala | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 279b78f0306..fdb3b6e60a9 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -1,16 +1,18 @@ package cromwell.filesystems.blob import com.azure.core.credential.AzureSasCredential +import com.azure.storage.blob.models.BlobStorageException import com.azure.storage.blob.nio.AzureFileSystem import com.google.common.net.UrlEscapers import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ +import java.io.IOException import java.net.{MalformedURLException, URI} import java.nio.file._ import scala.jdk.CollectionConverters._ import scala.language.postfixOps -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} object BlobPathBuilder { @@ -67,16 +69,34 @@ class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, } object BlobPath { + + def buildURI(endpoint: String) = new URI("azb://?endpoint=" + endpoint) + def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) } - def findNioPath(path: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator): NioPath = (for { - fileSystem <- retrieveFilesystem(new URI("azb://?endpoint=" + endpoint), container, blobTokenGenerator) - nioPath <- Try(fileSystem.getPath(path)) - } yield nioPath).get // Ideally we would unwrap this to a NioPath on success and on a access failure try to recover + def findNioPath(path: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator, attempted: Boolean = false): NioPath = (for { + fileSystem <- retrieveFilesystem(buildURI(endpoint), container, blobTokenGenerator) + nioPath <- Try(retrieveFilePath(fileSystem, path)) + } yield nioPath) match { + case Success(value) => value + case Failure(exception: IOException) => exception.getCause() match { + // Azure NIO library wraps blobStorageExceptions in IOExceptions. + case cause: BlobStorageException => { + // This exception indicated that the filesystem was opened successfully + closeFileSystem(buildURI(endpoint)) + if (!attempted) { + // Try to open the filesystem again after closing it + findNioPath(path, endpoint, container, blobTokenGenerator, true) + } else throw cause + } + case _ => throw exception + } + case Failure(exception) => throw exception + } // Ideally we would unwrap this to a NioPath on success and on a access failure try to recover def retrieveFilesystem(uri: URI, container: String, blobTokenGenerator: BlobTokenGenerator): Try[FileSystem] = { Try(FileSystems.getFileSystem(uri)) recover { @@ -87,6 +107,14 @@ object BlobPath { } } } + + def closeFileSystem(uri: URI) = Try(FileSystems.getFileSystem(uri)).map(_.close) + + def retrieveFilePath(fileSystem: FileSystem, path: String): NioPath = { + val blobPath = fileSystem.getPath(path) + fileSystem.provider().checkAccess(blobPath) + blobPath + } } case class BlobPath private[blob](pathString: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator) extends Path { //var token = blobTokenGenerator.getAccessToken From 00011bc998ef8b7423d5abf5193f49cc61f1ebac Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Wed, 24 Aug 2022 14:41:13 -0400 Subject: [PATCH 03/16] Ignore tests, and minor cleanup --- .../filesystems/blob/BlobPathBuilder.scala | 21 +++++++++++-------- .../blob/BlobPathBuilderSpec.scala | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index fdb3b6e60a9..c222072aae2 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -47,7 +47,7 @@ object BlobPathBuilder { val uri = parseURI(string) val storageAccount = parseStorageAccount(parseURI(endpoint)) val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container) - def hasEndpoint = parseStorageAccount(uri).contains(storageAccount.get) + def hasEndpoint = storageAccount.map(parseStorageAccount(uri).contains(_)).getOrElse(false) if (hasContainer && !storageAccount.isEmpty && hasEndpoint) { ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) } else { @@ -85,18 +85,21 @@ object BlobPath { case Success(value) => value case Failure(exception: IOException) => exception.getCause() match { // Azure NIO library wraps blobStorageExceptions in IOExceptions. - case cause: BlobStorageException => { - // This exception indicated that the filesystem was opened successfully - closeFileSystem(buildURI(endpoint)) - if (!attempted) { - // Try to open the filesystem again after closing it + case cause: BlobStorageException => attempted match { + // If a restart of the filesystem was already attempted, throw the exception that the IO is wrapping + case true => throw cause + // This exception indicated that the filesystem was opened successfully, but something is wrong + // Try closing the filesystem, and opening with a fresh token + case false => { + work + closeFileSystem(buildURI(endpoint)) findNioPath(path, endpoint, container, blobTokenGenerator, true) - } else throw cause + } } case _ => throw exception } case Failure(exception) => throw exception - } // Ideally we would unwrap this to a NioPath on success and on a access failure try to recover + } def retrieveFilesystem(uri: URI, container: String, blobTokenGenerator: BlobTokenGenerator): Try[FileSystem] = { Try(FileSystems.getFileSystem(uri)) recover { @@ -121,7 +124,7 @@ case class BlobPath private[blob](pathString: String, endpoint: String, containe //var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) override def nioPath: NioPath = BlobPath.findNioPath(path = pathString, endpoint, container, blobTokenGenerator) - override protected def newPath(nioPath: NioPath): Path = BlobPath(pathString, endpoint, container, blobTokenGenerator) + override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString(), endpoint, container, blobTokenGenerator) override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index e3e4bc1c2bb..41e0ec3f56d 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -42,7 +42,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ } } - it should "build a blob path from a test string and read a file" in { + ignore should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost val store = "inputs" @@ -60,7 +60,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ fileText should include ("This is my test file!!!! Did it work?") } - it should "build duplicate blob paths in the same filesystem" in { + ignore should "build duplicate blob paths in the same filesystem" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = "inputs" val evalPath = "/test/inputFile.txt" From 1a9e556b628913f953a034d1d007d4b9203cedf7 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Thu, 25 Aug 2022 10:19:00 -0400 Subject: [PATCH 04/16] Remove stray line --- .../main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index c222072aae2..3e464319c1b 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -91,7 +91,6 @@ object BlobPath { // This exception indicated that the filesystem was opened successfully, but something is wrong // Try closing the filesystem, and opening with a fresh token case false => { - work closeFileSystem(buildURI(endpoint)) findNioPath(path, endpoint, container, blobTokenGenerator, true) } From 45b9b45d368e94b04d25368e8be9a18b538d3329 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Fri, 26 Aug 2022 16:29:21 -0400 Subject: [PATCH 05/16] Draft of manager class for handling expiring file systems --- .../filesystems/blob/BlobPathBuilder.scala | 91 ++++++------------- .../blob/BlobPathBuilderFactory.scala | 68 +++++++++++--- .../blob/BlobPathBuilderSpec.scala | 12 +-- 3 files changed, 88 insertions(+), 83 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 3e464319c1b..8235e4da4d3 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -1,18 +1,15 @@ package cromwell.filesystems.blob import com.azure.core.credential.AzureSasCredential -import com.azure.storage.blob.models.BlobStorageException -import com.azure.storage.blob.nio.AzureFileSystem import com.google.common.net.UrlEscapers import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ -import java.io.IOException import java.net.{MalformedURLException, URI} -import java.nio.file._ -import scala.jdk.CollectionConverters._ +import java.time.Instant +import java.time.temporal.TemporalAmount import scala.language.postfixOps -import scala.util.{Failure, Success, Try} +import scala.util.{Failure, Try} object BlobPathBuilder { @@ -22,7 +19,7 @@ object BlobPathBuilder { def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) - def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").filter(!_.isEmpty()).headOption + def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").find(!_.isEmpty()) /** * Validates a that a path from a string is a valid BlobPath of the format: @@ -46,9 +43,9 @@ object BlobPathBuilder { Try { val uri = parseURI(string) val storageAccount = parseStorageAccount(parseURI(endpoint)) - val hasContainer = uri.getPath().split("/").filter(!_.isEmpty()).headOption.contains(container) - def hasEndpoint = storageAccount.map(parseStorageAccount(uri).contains(_)).getOrElse(false) - if (hasContainer && !storageAccount.isEmpty && hasEndpoint) { + val hasContainer = uri.getPath().split("/").find(!_.isEmpty()).contains(container) + val hasEndpoint = storageAccount.exists(parseStorageAccount(uri).contains(_)) + if (hasContainer && storageAccount.isDefined && hasEndpoint) { ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) } else { UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) @@ -57,75 +54,39 @@ object BlobPathBuilder { } } -class BlobPathBuilder(blobTokenGenerator: BlobTokenGenerator, container: String, endpoint: String) extends PathBuilder { +class BlobPathBuilder(fsm: FileSystemManager, container: String, endpoint: String) extends PathBuilder { def build(string: String): Try[BlobPath] = { validateBlobPath(string, container, endpoint) match { - case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, blobTokenGenerator)) + case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, fsm)) case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage) } } override def name: String = "Azure Blob Storage" } -object BlobPath { +case class BlobPath private[blob](pathString: String, endpoint: String, container: String, fsm: FileSystemManager) extends Path { + //var token = blobTokenGenerator.getAccessToken + //var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) + override def nioPath: NioPath = findNioPath(path = pathString, endpoint, container) - def buildURI(endpoint: String) = new URI("azb://?endpoint=" + endpoint) + override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container, fsm) - def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { - Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), - (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), - (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) - } + override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/") - def findNioPath(path: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator, attempted: Boolean = false): NioPath = (for { - fileSystem <- retrieveFilesystem(buildURI(endpoint), container, blobTokenGenerator) - nioPath <- Try(retrieveFilePath(fileSystem, path)) - } yield nioPath) match { - case Success(value) => value - case Failure(exception: IOException) => exception.getCause() match { - // Azure NIO library wraps blobStorageExceptions in IOExceptions. - case cause: BlobStorageException => attempted match { - // If a restart of the filesystem was already attempted, throw the exception that the IO is wrapping - case true => throw cause - // This exception indicated that the filesystem was opened successfully, but something is wrong - // Try closing the filesystem, and opening with a fresh token - case false => { - closeFileSystem(buildURI(endpoint)) - findNioPath(path, endpoint, container, blobTokenGenerator, true) - } - } - case _ => throw exception - } - case Failure(exception) => throw exception - } + override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString - def retrieveFilesystem(uri: URI, container: String, blobTokenGenerator: BlobTokenGenerator): Try[FileSystem] = { - Try(FileSystems.getFileSystem(uri)) recover { - // If no filesystem already exists, this will create a new connection, with the provided configs - case _: FileSystemNotFoundException => { - val fileSystemConfig = buildConfigMap(blobTokenGenerator.getAccessToken, container) - FileSystems.newFileSystem(uri, fileSystemConfig.asJava) - } - } - } - - def closeFileSystem(uri: URI) = Try(FileSystems.getFileSystem(uri)).map(_.close) - - def retrieveFilePath(fileSystem: FileSystem, path: String): NioPath = { - val blobPath = fileSystem.getPath(path) - fileSystem.provider().checkAccess(blobPath) - blobPath - } + def findNioPath(path: String, endpoint: String, container: String): NioPath = (for { + fileSystem <- fsm.retrieveFilesystem() + nioPath = fileSystem.getPath(path) + } yield nioPath).get } -case class BlobPath private[blob](pathString: String, endpoint: String, container: String, blobTokenGenerator: BlobTokenGenerator) extends Path { - //var token = blobTokenGenerator.getAccessToken - //var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) - override def nioPath: NioPath = BlobPath.findNioPath(path = pathString, endpoint, container, blobTokenGenerator) - - override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString(), endpoint, container, blobTokenGenerator) - override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/") +case class TokenExpiration(token: AzureSasCredential, buffer: TemporalAmount) { + val expiry = for { + expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) + instant = Instant.parse(expiryString) + } yield instant - override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString() + def hasTokenExpired = expiry.map(_.isAfter(Instant.now.plus(buffer))).getOrElse(false) } diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index 32187a3a280..837071dd72a 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -7,6 +7,7 @@ import com.azure.core.management.profile.AzureProfile import com.azure.identity.DefaultAzureCredentialBuilder import com.azure.resourcemanager.AzureResourceManager import com.azure.storage.blob.BlobContainerClientBuilder +import com.azure.storage.blob.nio.AzureFileSystem import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} import com.azure.storage.common.StorageSharedKeyCredential import com.typesafe.config.Config @@ -14,30 +15,75 @@ import cromwell.core.WorkflowOptions import cromwell.core.path.PathBuilderFactory import net.ceedubs.ficus.Ficus._ -import java.time.OffsetDateTime +import java.net.URI +import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} +import java.time.temporal.ChronoUnit +import java.time.{Duration, OffsetDateTime} import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Try} final case class BlobFileSystemConfig(config: Config) final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { val sasToken: String = instanceConfig.as[String]("sas-token") val container: String = instanceConfig.as[String]("store") val endpoint: String = instanceConfig.as[String]("endpoint") - val workspaceId: String = instanceConfig.as[String]("workspace-id") - val workspaceManagerURL: String = singletonConfig.config.as[String]("workspace-manager-url") + val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id") + val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url") - val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( - container, endpoint, Option(workspaceId), Option(workspaceManagerURL)) + val fsm = FileSystemManager(container, endpoint, 10, workspaceId, workspaceManagerURL) override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { Future { - new BlobPathBuilder(blobTokenGenerator, container, endpoint) + new BlobPathBuilder(fsm, container, endpoint) + } + } +} + +case class FileSystemManager(container: String, + endpoint: String, + preemptionMinutes: Long, + workspaceId: Option[String] = None, + workspaceManagerURL: Option[String] = None) { + + var expiry: Option[TokenExpiration] = None + val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( + container, endpoint, workspaceId, workspaceManagerURL) + + def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { + Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), + (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) + } + + def uri = new URI("azb://?endpoint=" + endpoint) + + def retrieveFilesystem(): Try[FileSystem] = { + synchronized { + expiry.map(_.hasTokenExpired) match { + case Some(false) => Try(FileSystems.getFileSystem(uri)) recoverWith { + // If no filesystem already exists, this will create a new connection, with the provided configs + case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) + } + // If the token has expired, OR there is no token record, try to close the FS and regenerate + case _ => { + closeFileSystem(uri) + blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) + } + } } } + + def generateFilesystem(uri: URI, container: String, token: AzureSasCredential): Try[FileSystem] = { + expiry = Some(TokenExpiration(token, Duration.of(preemptionMinutes, ChronoUnit.MINUTES))) + Try(FileSystems.newFileSystem(uri, buildConfigMap(token, container).asJava)) + } + + def closeFileSystem(uri: URI) = Try(FileSystems.getFileSystem(uri)).map(_.close) } sealed trait BlobTokenGenerator { - def getAccessToken: AzureSasCredential + def generateAccessToken: Try[AzureSasCredential] } object BlobTokenGenerator { @@ -57,13 +103,11 @@ object BlobTokenGenerator { } case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator { - def getAccessToken: AzureSasCredential = { - throw new NotImplementedError - } + def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError) } case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator { - def getAccessToken: AzureSasCredential = { + def generateAccessToken: Try[AzureSasCredential] = { val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match { case Some(storageAccountName) => storageAccountName case _ => throw new Exception("Storage account could not be parsed from endpoint") @@ -110,6 +154,6 @@ case class NativeBlobTokenGenerator(container: String, endpoint: String) extends blobContainerSasPermission ) - new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues)) + Try(new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues))) } } diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 41e0ec3f56d..85a64488757 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -2,7 +2,6 @@ package cromwell.filesystems.blob import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers - object BlobPathBuilderSpec { def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net" } @@ -47,9 +46,9 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost val store = "inputs" val evalPath = "/test/inputFile.txt" - val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) + val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) val testString = endpoint + "/" + store + evalPath - val blobPath: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + val blobPath: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() blobPath.container should equal(store) blobPath.endpoint should equal(endpoint) @@ -58,17 +57,18 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val is = blobPath.newInputStream() val fileText = (is.readAllBytes.map(_.toChar)).mkString fileText should include ("This is my test file!!!! Did it work?") + blobPath.nioPath.getFileSystem().close() } ignore should "build duplicate blob paths in the same filesystem" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = "inputs" val evalPath = "/test/inputFile.txt" - val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(store, endpoint) + val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) val testString = endpoint + "/" + store + evalPath - val blobPath1: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + val blobPath1: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() blobPath1.nioPath.getFileSystem().close() - val blobPath2: BlobPath = new BlobPathBuilder(blobTokenGenerator, store, endpoint) build testString getOrElse fail() + val blobPath2: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() blobPath1 should equal(blobPath2) val is = blobPath1.newInputStream() val fileText = (is.readAllBytes.map(_.toChar)).mkString From 8f00cc139e39f42919bac4d894deb75f94d8ab51 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Fri, 26 Aug 2022 16:37:26 -0400 Subject: [PATCH 06/16] Style fixes --- .../scala/cromwell/filesystems/blob/BlobPathBuilder.scala | 8 ++++---- .../filesystems/blob/BlobPathBuilderFactory.scala | 2 +- .../cromwell/filesystems/blob/BlobPathBuilderSpec.scala | 7 +++---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 8235e4da4d3..ada80242f30 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -18,8 +18,8 @@ object BlobPathBuilder { case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" - def parseURI(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) - def parseStorageAccount(uri: URI) = uri.getHost().split("\\.").find(!_.isEmpty()) + def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) + def parseStorageAccount(uri: URI): Option[String] = uri.getHost.split("\\.").find(_.nonEmpty) /** * Validates a that a path from a string is a valid BlobPath of the format: @@ -43,7 +43,7 @@ object BlobPathBuilder { Try { val uri = parseURI(string) val storageAccount = parseStorageAccount(parseURI(endpoint)) - val hasContainer = uri.getPath().split("/").find(!_.isEmpty()).contains(container) + val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container) val hasEndpoint = storageAccount.exists(parseStorageAccount(uri).contains(_)) if (hasContainer && storageAccount.isDefined && hasEndpoint) { ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) @@ -88,5 +88,5 @@ case class TokenExpiration(token: AzureSasCredential, buffer: TemporalAmount) { instant = Instant.parse(expiryString) } yield instant - def hasTokenExpired = expiry.map(_.isAfter(Instant.now.plus(buffer))).getOrElse(false) + def hasTokenExpired: Boolean = expiry.exists(_.isAfter(Instant.now.plus(buffer))) } diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index 837071dd72a..b48815d2258 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -79,7 +79,7 @@ case class FileSystemManager(container: String, Try(FileSystems.newFileSystem(uri, buildConfigMap(token, container).asJava)) } - def closeFileSystem(uri: URI) = Try(FileSystems.getFileSystem(uri)).map(_.close) + def closeFileSystem(uri: URI): Try[Unit] = Try(FileSystems.getFileSystem(uri)).map(_.close) } sealed trait BlobTokenGenerator { diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 85a64488757..1c0a756a50b 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -26,7 +26,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account") - case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) + case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) } } @@ -37,7 +37,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val testString = endpoint + "badContainer" + evalPath BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container") - case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage() should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) + case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) } } @@ -57,7 +57,6 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val is = blobPath.newInputStream() val fileText = (is.readAllBytes.map(_.toChar)).mkString fileText should include ("This is my test file!!!! Did it work?") - blobPath.nioPath.getFileSystem().close() } ignore should "build duplicate blob paths in the same filesystem" in { @@ -67,7 +66,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) val testString = endpoint + "/" + store + evalPath val blobPath1: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() - blobPath1.nioPath.getFileSystem().close() + blobPath1.nioPath.getFileSystem.close() val blobPath2: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() blobPath1 should equal(blobPath2) val is = blobPath1.newInputStream() From 832e546439e08fa30aedca2052b2b243602e17ca Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Wed, 31 Aug 2022 15:58:47 -0400 Subject: [PATCH 07/16] Refactor of blobfilesystemManager and tests covering its functionality --- .../filesystems/blob/BlobPathBuilder.scala | 49 ++--- .../blob/BlobPathBuilderFactory.scala | 175 +++++++++--------- .../blob/BlobPathBuilderFactorySpec.scala | 170 ++++++++++++++++- .../blob/BlobPathBuilderSpec.scala | 47 ++--- 4 files changed, 297 insertions(+), 144 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index ada80242f30..dd863eb5425 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -1,15 +1,12 @@ package cromwell.filesystems.blob -import com.azure.core.credential.AzureSasCredential import com.google.common.net.UrlEscapers import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ import java.net.{MalformedURLException, URI} -import java.time.Instant -import java.time.temporal.TemporalAmount import scala.language.postfixOps -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} object BlobPathBuilder { @@ -17,9 +14,9 @@ object BlobPathBuilder { case class ValidBlobPath(path: String) extends BlobPathValidation case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation - def invalidBlobPathMessage(container: String, endpoint: String) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" + def invalidBlobPathMessage(container: BlobContainerName, endpoint: EndpointURL) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) - def parseStorageAccount(uri: URI): Option[String] = uri.getHost.split("\\.").find(_.nonEmpty) + def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)).fold[Try[StorageAccountName]](Failure(new Exception("bad")))(Success(_)) /** * Validates a that a path from a string is a valid BlobPath of the format: @@ -39,13 +36,13 @@ object BlobPathBuilder { * * If the configured container and storage account do not match, the string is considered unparsable */ - def validateBlobPath(string: String, container: String, endpoint: String): BlobPathValidation = { + def validateBlobPath(string: String, container: BlobContainerName, endpoint: EndpointURL): BlobPathValidation = { Try { val uri = parseURI(string) - val storageAccount = parseStorageAccount(parseURI(endpoint)) - val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container) - val hasEndpoint = storageAccount.exists(parseStorageAccount(uri).contains(_)) - if (hasContainer && storageAccount.isDefined && hasEndpoint) { + val storageAccount = parseStorageAccount(parseURI(endpoint.value)) + val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container.value) + val hasEndpoint = storageAccount.toOption.exists(parseStorageAccount(uri).toOption.contains(_)) + if (hasContainer && storageAccount.isSuccess && hasEndpoint) { ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) } else { UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) @@ -54,39 +51,31 @@ object BlobPathBuilder { } } -class BlobPathBuilder(fsm: FileSystemManager, container: String, endpoint: String) extends PathBuilder { +class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(private val fsm: BlobFileSystemManager) extends PathBuilder { def build(string: String): Try[BlobPath] = { validateBlobPath(string, container, endpoint) match { - case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container, fsm)) + case ValidBlobPath(path) => Try(BlobPath(path, endpoint, container)(fsm)) case UnparsableBlobPath(errorMessage: Throwable) => Failure(errorMessage) } } override def name: String = "Azure Blob Storage" } -case class BlobPath private[blob](pathString: String, endpoint: String, container: String, fsm: FileSystemManager) extends Path { - //var token = blobTokenGenerator.getAccessToken - //var expiry = token.getSignature.split("&").filter(_.startsWith("se")).headOption.map(_.replaceFirst("se=","")) - override def nioPath: NioPath = findNioPath(path = pathString, endpoint, container) +case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, container: BlobContainerName)(private val fsm: BlobFileSystemManager) extends Path { + override def nioPath: NioPath = findNioPath(pathString) - override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container, fsm) + override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container)(fsm) override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/") - override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString + override def pathWithoutScheme: String = parseURI(endpoint.value).getHost + "/" + container + "/" + nioPath.toString - def findNioPath(path: String, endpoint: String, container: String): NioPath = (for { + private def findNioPath(path: String): NioPath = (for { fileSystem <- fsm.retrieveFilesystem() nioPath = fileSystem.getPath(path) - } yield nioPath).get -} - -case class TokenExpiration(token: AzureSasCredential, buffer: TemporalAmount) { - val expiry = for { - expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) - instant = Instant.parse(expiryString) - } yield instant - - def hasTokenExpired: Boolean = expiry.exists(_.isAfter(Instant.now.plus(buffer))) + } yield nioPath) match { + case Success(value) => value + case Failure(exception) => throw exception + } } diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index b48815d2258..cbdce60afa8 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -6,9 +6,10 @@ import com.azure.core.management.AzureEnvironment import com.azure.core.management.profile.AzureProfile import com.azure.identity.DefaultAzureCredentialBuilder import com.azure.resourcemanager.AzureResourceManager -import com.azure.storage.blob.BlobContainerClientBuilder +import com.azure.resourcemanager.storage.models.{StorageAccount, StorageAccountKey} import com.azure.storage.blob.nio.AzureFileSystem import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} +import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} import com.azure.storage.common.StorageSharedKeyCredential import com.typesafe.config.Config import cromwell.core.WorkflowOptions @@ -18,80 +19,97 @@ import net.ceedubs.ficus.Ficus._ import java.net.URI import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} import java.time.temporal.ChronoUnit -import java.time.{Duration, OffsetDateTime} +import java.time.{Duration, Instant, OffsetDateTime} import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} final case class BlobFileSystemConfig(config: Config) final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { - val sasToken: String = instanceConfig.as[String]("sas-token") - val container: String = instanceConfig.as[String]("store") - val endpoint: String = instanceConfig.as[String]("endpoint") - val workspaceId: Option[String] = instanceConfig.as[Option[String]]("workspace-id") - val workspaceManagerURL: Option[String] = singletonConfig.config.as[Option[String]]("workspace-manager-url") + val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("store")) + val endpoint: EndpointURL = EndpointURL(instanceConfig.as[String]("endpoint")) + val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId(_)) + val expiryBufferMinutes: Long = instanceConfig.as[Option[Long]]("expiry-buffer-minutes").getOrElse(10) + val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL(_)) - val fsm = FileSystemManager(container, endpoint, 10, workspaceId, workspaceManagerURL) + val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( + container, endpoint, workspaceId, workspaceManagerURL) + val fsm: BlobFileSystemManager = BlobFileSystemManager(container, endpoint, expiryBufferMinutes, blobTokenGenerator) override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { Future { - new BlobPathBuilder(fsm, container, endpoint) + new BlobPathBuilder(container, endpoint)(fsm) } } } -case class FileSystemManager(container: String, - endpoint: String, - preemptionMinutes: Long, - workspaceId: Option[String] = None, - workspaceManagerURL: Option[String] = None) { +final case class BlobContainerName(value: String) {override def toString: String = value} +final case class StorageAccountName(value: String) {override def toString: String = value} +final case class EndpointURL(value: String) {override def toString: String = value} +final case class WorkspaceId(value: String) {override def toString: String = value} +final case class WorkspaceManagerURL(value: String) {override def toString: String = value} - var expiry: Option[TokenExpiration] = None - val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( - container, endpoint, workspaceId, workspaceManagerURL) +object BlobFileSystemManager { + def parseTokenExpiry(token: AzureSasCredential): Option[Instant] = for { + expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) + instant = Instant.parse(expiryString) + } yield instant - def buildConfigMap(credential: AzureSasCredential, container: String): Map[String, Object] = { + def buildConfigMap(credential: AzureSasCredential, container: BlobContainerName): Map[String, Object] = { Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), - (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container), + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container.value), (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) } - - def uri = new URI("azb://?endpoint=" + endpoint) - + def hasTokenExpired(tokenExpiry: Instant, buffer: Duration): Boolean = Instant.now.plus(buffer).isAfter(tokenExpiry) + def uri(endpoint: EndpointURL) = new URI("azb://?endpoint=" + endpoint) +} +case class BlobFileSystemManager(container: BlobContainerName, + endpoint: EndpointURL, + expiryBufferMinutes: Long, + blobTokenGenerator: BlobTokenGenerator, + fileSystemAPI: FileSystemAPI = FileSystemAPI(), + initialExpiration: Option[Instant] = None) { + private var expiry: Option[Instant] = initialExpiration + val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES) + + def getExpiry: Option[Instant] = expiry + def uri: URI = BlobFileSystemManager.uri(endpoint) + def hasTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) def retrieveFilesystem(): Try[FileSystem] = { synchronized { - expiry.map(_.hasTokenExpired) match { - case Some(false) => Try(FileSystems.getFileSystem(uri)) recoverWith { + (hasTokenExpired, expiry) match { + case (false, Some(_)) => fileSystemAPI.getFileSystem(uri) recoverWith { // If no filesystem already exists, this will create a new connection, with the provided configs case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } // If the token has expired, OR there is no token record, try to close the FS and regenerate - case _ => { - closeFileSystem(uri) - blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) - } + case _ => + closeFileSystem(uri) + blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } } } - def generateFilesystem(uri: URI, container: String, token: AzureSasCredential): Try[FileSystem] = { - expiry = Some(TokenExpiration(token, Duration.of(preemptionMinutes, ChronoUnit.MINUTES))) - Try(FileSystems.newFileSystem(uri, buildConfigMap(token, container).asJava)) + private def generateFilesystem(uri: URI, container: BlobContainerName, token: AzureSasCredential): Try[FileSystem] = { + expiry = BlobFileSystemManager.parseTokenExpiry(token) + Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container))) } - def closeFileSystem(uri: URI): Try[Unit] = Try(FileSystems.getFileSystem(uri)).map(_.close) + private def closeFileSystem(uri: URI): Try[Unit] = fileSystemAPI.getFileSystem(uri).map(_.close) } -sealed trait BlobTokenGenerator { - def generateAccessToken: Try[AzureSasCredential] +case class FileSystemAPI() { + def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri)) + def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava) } +sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]} object BlobTokenGenerator { - def createBlobTokenGenerator(container: String, endpoint: String): BlobTokenGenerator = { + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL): BlobTokenGenerator = { createBlobTokenGenerator(container, endpoint, None, None) } - def createBlobTokenGenerator(container: String, endpoint: String, workspaceId: Option[String], workspaceManagerURL: Option[String]): BlobTokenGenerator = { - (container: String, endpoint: String, workspaceId, workspaceManagerURL) match { + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL]): BlobTokenGenerator = { + (container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match { case (container, endpoint, None, None) => NativeBlobTokenGenerator(container, endpoint) case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) => @@ -102,58 +120,43 @@ object BlobTokenGenerator { } } -case class WSMBlobTokenGenerator(container: String, endpoint: String, workspaceId: String, workspaceManagerURL: String) extends BlobTokenGenerator { +case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerURL: WorkspaceManagerURL) extends BlobTokenGenerator { def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError) } -case class NativeBlobTokenGenerator(container: String, endpoint: String) extends BlobTokenGenerator { - def generateAccessToken: Try[AzureSasCredential] = { - val storageAccountName = BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint)) match { - case Some(storageAccountName) => storageAccountName - case _ => throw new Exception("Storage account could not be parsed from endpoint") - } +case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL) extends BlobTokenGenerator { - val profile = new AzureProfile(AzureEnvironment.AZURE) - val azureCredential = new DefaultAzureCredentialBuilder() - .authorityHost(profile.getEnvironment.getActiveDirectoryEndpoint) + private val azureProfile = new AzureProfile(AzureEnvironment.AZURE) + private def azureCredentialBuilder = new DefaultAzureCredentialBuilder() + .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) .build - val azure = AzureResourceManager.authenticate(azureCredential, profile).withDefaultSubscription() - - val storageAccounts = azure.storageAccounts() - val storageAccount = storageAccounts - .list() - .asScala - .find(_.name == storageAccountName) - - val storageAccountKeys = storageAccount match { - case Some(value) => value.getKeys.asScala.map(_.value()) - case _ => throw new Exception("Storage Account not found") - } - - val storageAccountKey = storageAccountKeys.headOption match { - case Some(value) => value - case _ => throw new Exception("Storage Account has no keys") - } - - val keyCredential = new StorageSharedKeyCredential( - storageAccountName, - storageAccountKey - ) - val blobContainerClient = new BlobContainerClientBuilder() - .credential(keyCredential) - .endpoint(endpoint) - .containerName(container) - .buildClient() - - val blobContainerSasPermission = new BlobContainerSasPermission() - .setReadPermission(true) - .setCreatePermission(true) - .setListPermission(true) - val blobServiceSasSignatureValues = new BlobServiceSasSignatureValues( - OffsetDateTime.now.plusDays(1), - blobContainerSasPermission - ) - - Try(new AzureSasCredential(blobContainerClient.generateSas(blobServiceSasSignatureValues))) + private def azure = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription() + + private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) + .fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_)) + private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = { + new BlobContainerClientBuilder() + .credential(credential) + .endpoint(endpoint.value) + .containerName(container.value) + .buildClient() } + private val bcsp = new BlobContainerSasPermission() + .setReadPermission(true) + .setCreatePermission(true) + .setListPermission(true) + + + def generateAccessToken: Try[AzureSasCredential] = for { + configuredAccount <- BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint.value)) + azureAccount <- findAzureStorageAccount(configuredAccount) + keys = azureAccount.getKeys.asScala + key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) + first = key.value + sskc = new StorageSharedKeyCredential(configuredAccount.value, first) + bcc = buildBlobContainerClient(sskc, endpoint, container) + bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp) + asc = new AzureSasCredential(bcc.generateSas(bsssv)) + } yield asc } + diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala index 8c9b2345c69..cfdd25405e7 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala @@ -1,17 +1,35 @@ package cromwell.filesystems.blob +import com.azure.core.credential.AzureSasCredential import com.typesafe.config.ConfigFactory +import common.mock.MockSugar +import org.mockito.Mockito._ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers { +import java.nio.file.{FileSystem, FileSystemNotFoundException} +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit +import java.time.{Duration, Instant, ZoneId} +import scala.util.{Failure, Try} + +object BlobPathBuilderFactorySpec { + def buildExampleSasToken(expiry: Instant): AzureSasCredential = { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault()) + val sv = formatter.format(expiry) + val se = expiry.toString().replace(":","%3A") + new AzureSasCredential(s"sv=$sv&se=$se&sr=c&sp=rcl") + } +} +class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSugar { + def generateTokenExpiration(minutes: Long) = Instant.now.plus(minutes, ChronoUnit.MINUTES) it should "parse configs for a functioning factory" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") - val store = "inputs" + val store = BlobContainerName("inputs") val sasToken = "{SAS TOKEN HERE}" - val workspaceId = "mockWorkspaceId" - val workspaceManagerURL = "https://test.ws.org" + val workspaceId = WorkspaceId("mockWorkspaceId") + val workspaceManagerURL = WorkspaceManagerURL("https://test.ws.org") val instanceConfig = ConfigFactory.parseString( s""" |sas-token = "$sasToken" @@ -24,8 +42,146 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers { val factory = BlobPathBuilderFactory(globalConfig, instanceConfig, new BlobFileSystemConfig(singletonConfig)) factory.container should equal(store) factory.endpoint should equal(endpoint) - factory.sasToken should equal(sasToken) - factory.workspaceId should equal(workspaceId) - factory.workspaceManagerURL should equal(workspaceManagerURL) + factory.workspaceId should contain(workspaceId) + factory.workspaceManagerURL should contain(workspaceManagerURL) + } + + it should "build an example sas token of the correct format" in { + val testToken = BlobPathBuilderFactorySpec.buildExampleSasToken(Instant.ofEpochMilli(1603794041000L)) + val sourceToken = "sv=2020-10-27&se=2020-10-27T10%3A20%3A41Z&sr=c&sp=rcl" + testToken.getSignature should equal(sourceToken) + } + + it should "parse an expiration time from a sas token" in { + val expiryTime = generateTokenExpiration(20L) + val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(expiryTime) + val expiry = BlobFileSystemManager.parseTokenExpiry(sasToken) + expiry should contain(expiryTime) + } + + it should "verify an unexpired token will be processed as unexpired" in { + val expiryTime = generateTokenExpiration(11L) + val expired = BlobFileSystemManager.hasTokenExpired(expiryTime, Duration.ofMinutes(10L)) + expired shouldBe false + } + + it should "test an expired token will be processed as expired" in { + val expiryTime = generateTokenExpiration(9L) + val expired = BlobFileSystemManager.hasTokenExpired(expiryTime, Duration.ofMinutes(10L)) + expired shouldBe true + } + + it should "test retrieveFileSystem with expired filesystem" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val expiredToken = generateTokenExpiration(9L) + val refreshedToken = generateTokenExpiration(69L) + val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) + val store = BlobContainerName("inputs") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val azureUri = BlobFileSystemManager.uri(endpoint) + + val fileSystems = mock[FileSystemAPI] + val fileSystem = mock[FileSystem] + when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) + when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) + val blobTokenGenerator = mock[BlobTokenGenerator] + when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) + + val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems, Some(expiredToken)) + fsm.getExpiry() should contain(expiredToken) + fsm.hasTokenExpired shouldBe true + fsm.retrieveFilesystem() + + fsm.getExpiry().isDefined shouldBe true + fsm.getExpiry() should contain(refreshedToken) + fsm.hasTokenExpired shouldBe false + verify(fileSystems, times(1)).getFileSystem(azureUri) + verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) + verify(fileSystem, times(1)).close() + } + + it should "test retrieveFileSystem with an unexpired fileSystem" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val initialToken = generateTokenExpiration(11L) + val refreshedToken = generateTokenExpiration(71L) + val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) + val store = BlobContainerName("inputs") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val azureUri = BlobFileSystemManager.uri(endpoint) + + val fileSystems = mock[FileSystemAPI] + val fileSystem = mock[FileSystem] + when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) + when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) + val blobTokenGenerator = mock[BlobTokenGenerator] + when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) + + val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems, Some(initialToken)) + fsm.getExpiry() should contain(initialToken) + fsm.hasTokenExpired shouldBe false + fsm.retrieveFilesystem() + + fsm.getExpiry().isDefined shouldBe true + fsm.getExpiry() should contain(initialToken) + fsm.hasTokenExpired shouldBe false + verify(fileSystems, times(1)).getFileSystem(azureUri) + verify(fileSystems, never()).newFileSystem(azureUri, configMap) + verify(fileSystem, never()).close() + } + + it should "test retrieveFileSystem with an uninitialized filesystem" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val refreshedToken = generateTokenExpiration(71L) + val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) + val store = BlobContainerName("inputs") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val azureUri = BlobFileSystemManager.uri(endpoint) + + val fileSystems = mock[FileSystemAPI] + val fileSystem = mock[FileSystem] + when(fileSystems.getFileSystem(azureUri)).thenReturn(Failure(new FileSystemNotFoundException)) + when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) + val blobTokenGenerator = mock[BlobTokenGenerator] + when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) + + val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems) + fsm.getExpiry().isDefined shouldBe false + fsm.hasTokenExpired shouldBe false + fsm.retrieveFilesystem() + + fsm.getExpiry().isDefined shouldBe true + fsm.getExpiry() should contain(refreshedToken) + fsm.hasTokenExpired shouldBe false + verify(fileSystems, times(1)).getFileSystem(azureUri) + verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) + verify(fileSystem, never()).close() + } + + it should "test retrieveFileSystem with an unknown filesystem" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val refreshedToken = generateTokenExpiration(71L) + val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) + val store = BlobContainerName("inputs") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val azureUri = BlobFileSystemManager.uri(endpoint) + + val fileSystems = mock[FileSystemAPI] + val fileSystem = mock[FileSystem] + when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) + when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) + val blobTokenGenerator = mock[BlobTokenGenerator] + when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) + + val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems) + fsm.getExpiry().isDefined shouldBe false + fsm.hasTokenExpired shouldBe false + fsm.retrieveFilesystem() + + fsm.getExpiry().isDefined shouldBe true + fsm.getExpiry() should contain(refreshedToken) + fsm.hasTokenExpired shouldBe false + verify(fileSystems, times(1)).getFileSystem(azureUri) + verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) + verify(fileSystem, times(1)).close() } } diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 1c0a756a50b..b2ac58d8ec4 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -1,18 +1,19 @@ package cromwell.filesystems.blob import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import common.mock.MockSugar object BlobPathBuilderSpec { - def buildEndpoint(storageAccount: String) = s"https://$storageAccount.blob.core.windows.net" + def buildEndpoint(storageAccount: String) = EndpointURL(s"https://$storageAccount.blob.core.windows.net") } -class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ - +class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { + // ValidateBlobPath it should "parse a URI into a path" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") - val container = "container" + val container = BlobContainerName("container") val evalPath = "/path/to/file" - val testString = endpoint + "/" + container + evalPath + val testString = endpoint.value + "/" + container + evalPath BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { case BlobPathBuilder.ValidBlobPath(path) => path should equal(evalPath) case BlobPathBuilder.UnparsableBlobPath(errorMessage) => fail(errorMessage) @@ -21,9 +22,9 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ it should "bad storage account fails causes URI to fail parse into a path" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") - val container = "container" + val container = BlobContainerName("container") val evalPath = "/path/to/file" - val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount") + container + evalPath + val testString = BlobPathBuilderSpec.buildEndpoint("badStorageAccount").value + container.value + evalPath BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched storage account") case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) @@ -32,23 +33,26 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ it should "bad container fails causes URI to fail parse into a path" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") - val container = "container" + val container = BlobContainerName("container") val evalPath = "/path/to/file" - val testString = endpoint + "badContainer" + evalPath + val testString = endpoint.value + "badContainer" + evalPath BlobPathBuilder.validateBlobPath(testString, container, endpoint) match { case BlobPathBuilder.ValidBlobPath(path) => fail(s"Valid path: $path found when verifying mismatched container") case BlobPathBuilder.UnparsableBlobPath(errorMessage) => errorMessage.getMessage should equal(BlobPathBuilder.invalidBlobPathMessage(container, endpoint)) } } - ignore should "build a blob path from a test string and read a file" in { + // BlobPath methods + + it should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") - val endpointHost = BlobPathBuilder.parseURI(endpoint).getHost - val store = "inputs" + val endpointHost = BlobPathBuilder.parseURI(endpoint.value).getHost + val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) - val testString = endpoint + "/" + store + evalPath - val blobPath: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() + val blobTokenGenerator = mock[BlobTokenGenerator] + val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator) + val testString = endpoint.value + "/" + store + evalPath + val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() blobPath.container should equal(store) blobPath.endpoint should equal(endpoint) @@ -59,15 +63,16 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers{ fileText should include ("This is my test file!!!! Did it work?") } - ignore should "build duplicate blob paths in the same filesystem" in { + it should "build duplicate blob paths in the same filesystem" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") - val store = "inputs" + val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val fsm: FileSystemManager = FileSystemManager(store, endpoint, 10) - val testString = endpoint + "/" + store + evalPath - val blobPath1: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() + val blobTokenGenerator = mock[BlobTokenGenerator] + val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator) + val testString = endpoint.value + "/" + store + evalPath + val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() blobPath1.nioPath.getFileSystem.close() - val blobPath2: BlobPath = new BlobPathBuilder(fsm, store, endpoint) build testString getOrElse fail() + val blobPath2: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() blobPath1 should equal(blobPath2) val is = blobPath1.newInputStream() val fileText = (is.readAllBytes.map(_.toChar)).mkString From c907917e8f46c910e5b1c9814e38905b242cbb6c Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Thu, 1 Sep 2022 20:48:55 -0400 Subject: [PATCH 08/16] Refined tests to validate close filesystem as separate unit --- .../blob/BlobPathBuilderFactory.scala | 12 +- .../blob/BlobPathBuilderFactorySpec.scala | 119 +++++++++--------- 2 files changed, 66 insertions(+), 65 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index cbdce60afa8..1c97c074fa3 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -26,7 +26,7 @@ import scala.util.{Failure, Success, Try} final case class BlobFileSystemConfig(config: Config) final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { - val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("store")) + val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("container")) val endpoint: EndpointURL = EndpointURL(instanceConfig.as[String]("endpoint")) val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId(_)) val expiryBufferMinutes: Long = instanceConfig.as[Option[Long]]("expiry-buffer-minutes").getOrElse(10) @@ -74,17 +74,17 @@ case class BlobFileSystemManager(container: BlobContainerName, def getExpiry: Option[Instant] = expiry def uri: URI = BlobFileSystemManager.uri(endpoint) - def hasTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) + def isTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) def retrieveFilesystem(): Try[FileSystem] = { synchronized { - (hasTokenExpired, expiry) match { - case (false, Some(_)) => fileSystemAPI.getFileSystem(uri) recoverWith { + (isTokenExpired, expiry) match { + case (false, Some(_)) => fileSystemAPI.getFileSystem(uri).recoverWith { // If no filesystem already exists, this will create a new connection, with the provided configs case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } // If the token has expired, OR there is no token record, try to close the FS and regenerate case _ => - closeFileSystem(uri) + fileSystemAPI.closeFileSystem(uri) blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } } @@ -95,12 +95,12 @@ case class BlobFileSystemManager(container: BlobContainerName, Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container))) } - private def closeFileSystem(uri: URI): Try[Unit] = fileSystemAPI.getFileSystem(uri).map(_.close) } case class FileSystemAPI() { def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri)) def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava) + def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close) } sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]} diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala index cfdd25405e7..6fc357b38ef 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala @@ -25,23 +25,23 @@ object BlobPathBuilderFactorySpec { class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSugar { def generateTokenExpiration(minutes: Long) = Instant.now.plus(minutes, ChronoUnit.MINUTES) it should "parse configs for a functioning factory" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") - val store = BlobContainerName("inputs") - val sasToken = "{SAS TOKEN HERE}" + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val container = BlobContainerName("storageContainer") val workspaceId = WorkspaceId("mockWorkspaceId") val workspaceManagerURL = WorkspaceManagerURL("https://test.ws.org") val instanceConfig = ConfigFactory.parseString( s""" - |sas-token = "$sasToken" - |store = "$store" + |container = "$container" |endpoint = "$endpoint" + |expiry-buffer-minutes = "10" |workspace-id = "$workspaceId" """.stripMargin) val singletonConfig = ConfigFactory.parseString(s"""workspace-manager-url = "$workspaceManagerURL" """) val globalConfig = ConfigFactory.parseString("""""") val factory = BlobPathBuilderFactory(globalConfig, instanceConfig, new BlobFileSystemConfig(singletonConfig)) - factory.container should equal(store) + factory.container should equal(container) factory.endpoint should equal(endpoint) + factory.expiryBufferMinutes should equal(10L) factory.workspaceId should contain(workspaceId) factory.workspaceManagerURL should contain(workspaceManagerURL) } @@ -71,117 +71,118 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga expired shouldBe true } + it should "test that a filesystem gets closed correctly" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val azureUri = BlobFileSystemManager.uri(endpoint) + val fileSystems = mock[FileSystemAPI] + val fileSystem = mock[FileSystem] + when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) + when(fileSystems.closeFileSystem(azureUri)).thenCallRealMethod() + + fileSystems.closeFileSystem(azureUri) + verify(fileSystem, times(1)).close() + } + it should "test retrieveFileSystem with expired filesystem" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") val expiredToken = generateTokenExpiration(9L) val refreshedToken = generateTokenExpiration(69L) val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) - val store = BlobContainerName("inputs") - val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val container = BlobContainerName("storageContainer") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, container) val azureUri = BlobFileSystemManager.uri(endpoint) val fileSystems = mock[FileSystemAPI] - val fileSystem = mock[FileSystem] - when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) - when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) val blobTokenGenerator = mock[BlobTokenGenerator] when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) - val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems, Some(expiredToken)) - fsm.getExpiry() should contain(expiredToken) - fsm.hasTokenExpired shouldBe true + val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(expiredToken)) + fsm.getExpiry should contain(expiredToken) + fsm.isTokenExpired shouldBe true fsm.retrieveFilesystem() - fsm.getExpiry().isDefined shouldBe true - fsm.getExpiry() should contain(refreshedToken) - fsm.hasTokenExpired shouldBe false - verify(fileSystems, times(1)).getFileSystem(azureUri) + fsm.getExpiry should contain(refreshedToken) + fsm.isTokenExpired shouldBe false + verify(fileSystems, never()).getFileSystem(azureUri) verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) - verify(fileSystem, times(1)).close() + verify(fileSystems, times(1)).closeFileSystem(azureUri) } it should "test retrieveFileSystem with an unexpired fileSystem" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") val initialToken = generateTokenExpiration(11L) val refreshedToken = generateTokenExpiration(71L) val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) - val store = BlobContainerName("inputs") - val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val container = BlobContainerName("storageContainer") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, container) val azureUri = BlobFileSystemManager.uri(endpoint) + // Need a fake filesystem to supply the getFileSystem simulated try + val dummyFileSystem = null val fileSystems = mock[FileSystemAPI] - val fileSystem = mock[FileSystem] - when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) - when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) + when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(dummyFileSystem)) + val blobTokenGenerator = mock[BlobTokenGenerator] when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) - val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems, Some(initialToken)) - fsm.getExpiry() should contain(initialToken) - fsm.hasTokenExpired shouldBe false + val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(initialToken)) + fsm.getExpiry should contain(initialToken) + fsm.isTokenExpired shouldBe false fsm.retrieveFilesystem() - fsm.getExpiry().isDefined shouldBe true - fsm.getExpiry() should contain(initialToken) - fsm.hasTokenExpired shouldBe false + fsm.getExpiry should contain(initialToken) + fsm.isTokenExpired shouldBe false verify(fileSystems, times(1)).getFileSystem(azureUri) verify(fileSystems, never()).newFileSystem(azureUri, configMap) - verify(fileSystem, never()).close() + verify(fileSystems, never()).closeFileSystem(azureUri) } it should "test retrieveFileSystem with an uninitialized filesystem" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") val refreshedToken = generateTokenExpiration(71L) val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) - val store = BlobContainerName("inputs") - val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val container = BlobContainerName("storageContainer") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, container) val azureUri = BlobFileSystemManager.uri(endpoint) val fileSystems = mock[FileSystemAPI] - val fileSystem = mock[FileSystem] when(fileSystems.getFileSystem(azureUri)).thenReturn(Failure(new FileSystemNotFoundException)) - when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) val blobTokenGenerator = mock[BlobTokenGenerator] when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) - val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems) - fsm.getExpiry().isDefined shouldBe false - fsm.hasTokenExpired shouldBe false + val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems, Some(refreshedToken)) + fsm.getExpiry.isDefined shouldBe true + fsm.isTokenExpired shouldBe false fsm.retrieveFilesystem() - fsm.getExpiry().isDefined shouldBe true - fsm.getExpiry() should contain(refreshedToken) - fsm.hasTokenExpired shouldBe false + fsm.getExpiry should contain(refreshedToken) + fsm.isTokenExpired shouldBe false verify(fileSystems, times(1)).getFileSystem(azureUri) verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) - verify(fileSystem, never()).close() + verify(fileSystems, never()).closeFileSystem(azureUri) } it should "test retrieveFileSystem with an unknown filesystem" in { - val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") val refreshedToken = generateTokenExpiration(71L) val sasToken = BlobPathBuilderFactorySpec.buildExampleSasToken(refreshedToken) - val store = BlobContainerName("inputs") - val configMap = BlobFileSystemManager.buildConfigMap(sasToken, store) + val container = BlobContainerName("storageContainer") + val configMap = BlobFileSystemManager.buildConfigMap(sasToken, container) val azureUri = BlobFileSystemManager.uri(endpoint) val fileSystems = mock[FileSystemAPI] - val fileSystem = mock[FileSystem] - when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(fileSystem)) - when(fileSystems.newFileSystem(azureUri, configMap)).thenReturn(fileSystem) val blobTokenGenerator = mock[BlobTokenGenerator] when(blobTokenGenerator.generateAccessToken).thenReturn(Try(sasToken)) - val fsm = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator, fileSystems) - fsm.getExpiry().isDefined shouldBe false - fsm.hasTokenExpired shouldBe false + val fsm = BlobFileSystemManager(container, endpoint, 10L, blobTokenGenerator, fileSystems) + fsm.getExpiry.isDefined shouldBe false + fsm.isTokenExpired shouldBe false fsm.retrieveFilesystem() - fsm.getExpiry().isDefined shouldBe true - fsm.getExpiry() should contain(refreshedToken) - fsm.hasTokenExpired shouldBe false - verify(fileSystems, times(1)).getFileSystem(azureUri) + fsm.getExpiry should contain(refreshedToken) + fsm.isTokenExpired shouldBe false + verify(fileSystems, never()).getFileSystem(azureUri) verify(fileSystems, times(1)).newFileSystem(azureUri, configMap) - verify(fileSystem, times(1)).close() + verify(fileSystems, times(1)).closeFileSystem(azureUri) } } From cd02eb26381897ffa8a761b97170c3f67bdeda59 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Tue, 6 Sep 2022 15:59:42 -0400 Subject: [PATCH 09/16] Ignore connected tests --- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 2 +- .../filesystems/blob/BlobPathBuilderFactory.scala | 2 +- .../filesystems/blob/BlobPathBuilderSpec.scala | 12 +++++------- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index dd863eb5425..4f15714baac 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -16,7 +16,7 @@ object BlobPathBuilder { def invalidBlobPathMessage(container: BlobContainerName, endpoint: EndpointURL) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) - def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)).fold[Try[StorageAccountName]](Failure(new Exception("bad")))(Success(_)) + def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)).fold[Try[StorageAccountName]](Failure(new Exception("Could not parse storage account")))(Success(_)) /** * Validates a that a path from a string is a valid BlobPath of the format: diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index 1c97c074fa3..0e9a3089f92 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -130,7 +130,7 @@ case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: Endp private def azureCredentialBuilder = new DefaultAzureCredentialBuilder() .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) .build - private def azure = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription() + private def azure = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription("62b22893-6bc1-46d9-8a90-806bb3cce3c9") private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) .fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_)) diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index b2ac58d8ec4..3ace7d31e38 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -1,7 +1,7 @@ package cromwell.filesystems.blob +import common.mock.MockSugar import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import common.mock.MockSugar object BlobPathBuilderSpec { def buildEndpoint(storageAccount: String) = EndpointURL(s"https://$storageAccount.blob.core.windows.net") @@ -42,14 +42,12 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { } } - // BlobPath methods - - it should "build a blob path from a test string and read a file" in { + ignore should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint.value).getHost val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val blobTokenGenerator = mock[BlobTokenGenerator] + val blobTokenGenerator = NativeBlobTokenGenerator(store, endpoint) val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10L, blobTokenGenerator) val testString = endpoint.value + "/" + store + evalPath val blobPath: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() @@ -63,11 +61,11 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { fileText should include ("This is my test file!!!! Did it work?") } - it should "build duplicate blob paths in the same filesystem" in { + ignore should "build duplicate blob paths in the same filesystem" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" - val blobTokenGenerator = mock[BlobTokenGenerator] + val blobTokenGenerator = NativeBlobTokenGenerator(store, endpoint) val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator) val testString = endpoint.value + "/" + store + evalPath val blobPath1: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build testString getOrElse fail() From e113bb183f9116f2706804ea5b7013281897cb91 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Wed, 7 Sep 2022 13:26:50 -0400 Subject: [PATCH 10/16] Clean up of some things --- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 7 +++---- .../filesystems/blob/BlobPathBuilderFactory.scala | 10 ++++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 4f15714baac..362dafc584f 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -74,8 +74,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con private def findNioPath(path: String): NioPath = (for { fileSystem <- fsm.retrieveFilesystem() nioPath = fileSystem.getPath(path) - } yield nioPath) match { - case Success(value) => value - case Failure(exception) => throw exception - } + // This is purposefully an unprotected get because the NIO API needing an unwrapped path object. + // If an error occurs the api expects a thrown exception + } yield nioPath).get } diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index 0e9a3089f92..a58a8b65438 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -68,22 +68,23 @@ case class BlobFileSystemManager(container: BlobContainerName, expiryBufferMinutes: Long, blobTokenGenerator: BlobTokenGenerator, fileSystemAPI: FileSystemAPI = FileSystemAPI(), - initialExpiration: Option[Instant] = None) { + private val initialExpiration: Option[Instant] = None) { private var expiry: Option[Instant] = initialExpiration val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES) def getExpiry: Option[Instant] = expiry def uri: URI = BlobFileSystemManager.uri(endpoint) def isTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) + def shouldReopenFilesystem: Boolean = isTokenExpired || expiry.isEmpty def retrieveFilesystem(): Try[FileSystem] = { synchronized { - (isTokenExpired, expiry) match { - case (false, Some(_)) => fileSystemAPI.getFileSystem(uri).recoverWith { + shouldReopenFilesystem match { + case false => fileSystemAPI.getFileSystem(uri).recoverWith { // If no filesystem already exists, this will create a new connection, with the provided configs case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } // If the token has expired, OR there is no token record, try to close the FS and regenerate - case _ => + case true => fileSystemAPI.closeFileSystem(uri) blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) } @@ -92,6 +93,7 @@ case class BlobFileSystemManager(container: BlobContainerName, private def generateFilesystem(uri: URI, container: BlobContainerName, token: AzureSasCredential): Try[FileSystem] = { expiry = BlobFileSystemManager.parseTokenExpiry(token) + if (expiry.isEmpty) return Failure(new Exception("Could not reopen filesystem, no expiration found")) Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container))) } From 313b9fcbf50593cb3e6b4acae19f73fd258c09ea Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Wed, 7 Sep 2022 15:08:59 -0400 Subject: [PATCH 11/16] Refactor BlobFileSystemManager to separate file, and some other cleanup --- .../blob/BlobFileSystemManager.scala | 139 +++++++++++++++++ .../blob/BlobPathBuilderFactory.scala | 145 +----------------- .../blob/BlobPathBuilderSpec.scala | 15 ++ 3 files changed, 162 insertions(+), 137 deletions(-) create mode 100644 filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala new file mode 100644 index 00000000000..7a0311341d6 --- /dev/null +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala @@ -0,0 +1,139 @@ +package cromwell.filesystems.blob + +import com.azure.core.credential.AzureSasCredential +import com.azure.core.management.AzureEnvironment +import com.azure.core.management.profile.AzureProfile +import com.azure.identity.DefaultAzureCredentialBuilder +import com.azure.resourcemanager.AzureResourceManager +import com.azure.resourcemanager.storage.models.{StorageAccount, StorageAccountKey} +import com.azure.storage.blob.nio.AzureFileSystem +import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} +import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} +import com.azure.storage.common.StorageSharedKeyCredential + +import java.net.URI +import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} +import java.time.temporal.ChronoUnit +import java.time.{Duration, Instant, OffsetDateTime} +import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try} + +case class FileSystemAPI() { + def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri)) + def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava) + def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close) +} + +object BlobFileSystemManager { + def parseTokenExpiry(token: AzureSasCredential): Option[Instant] = for { + expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) + instant = Instant.parse(expiryString) + } yield instant + + def buildConfigMap(credential: AzureSasCredential, container: BlobContainerName): Map[String, Object] = { + Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), + (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container.value), + (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) + } + def hasTokenExpired(tokenExpiry: Instant, buffer: Duration): Boolean = Instant.now.plus(buffer).isAfter(tokenExpiry) + def uri(endpoint: EndpointURL) = new URI("azb://?endpoint=" + endpoint) +} +case class BlobFileSystemManager( + container: BlobContainerName, + endpoint: EndpointURL, + expiryBufferMinutes: Long, + blobTokenGenerator: BlobTokenGenerator, + fileSystemAPI: FileSystemAPI = FileSystemAPI(), + private val initialExpiration: Option[Instant] = None) { + private var expiry: Option[Instant] = initialExpiration + val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES) + + def getExpiry: Option[Instant] = expiry + def uri: URI = BlobFileSystemManager.uri(endpoint) + def isTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) + def shouldReopenFilesystem: Boolean = isTokenExpired || expiry.isEmpty + def retrieveFilesystem(): Try[FileSystem] = { + synchronized { + shouldReopenFilesystem match { + case false => fileSystemAPI.getFileSystem(uri).recoverWith { + // If no filesystem already exists, this will create a new connection, with the provided configs + case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) + } + // If the token has expired, OR there is no token record, try to close the FS and regenerate + case true => + fileSystemAPI.closeFileSystem(uri) + blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) + } + } + } + + private def generateFilesystem(uri: URI, container: BlobContainerName, token: AzureSasCredential): Try[FileSystem] = { + expiry = BlobFileSystemManager.parseTokenExpiry(token) + if (expiry.isEmpty) return Failure(new Exception("Could not reopen filesystem, no expiration found")) + Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container))) + } + +} + +sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]} +object BlobTokenGenerator { + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String]): BlobTokenGenerator = { + createBlobTokenGenerator(container, endpoint, None, None, subscription) + } + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL], subscription: Option[String]): BlobTokenGenerator = { + (container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match { + case (container, endpoint, None, None) => + NativeBlobTokenGenerator(container, endpoint, subscription) + case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) => + WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL) + case _ => + throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.") + } + } + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL): BlobTokenGenerator = createBlobTokenGenerator(container, endpoint, None) + def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL]): BlobTokenGenerator = + createBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL, None) + +} + +case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerURL: WorkspaceManagerURL) extends BlobTokenGenerator { + def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError) +} + +case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String] = None) extends BlobTokenGenerator { + + private val azureProfile = new AzureProfile(AzureEnvironment.AZURE) + private def azureCredentialBuilder = new DefaultAzureCredentialBuilder() + .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) + .build + private def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub) + private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription() + private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription) + + private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) + .fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_)) + private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = { + new BlobContainerClientBuilder() + .credential(credential) + .endpoint(endpoint.value) + .containerName(container.value) + .buildClient() + } + private val bcsp = new BlobContainerSasPermission() + .setReadPermission(true) + .setCreatePermission(true) + .setListPermission(true) + + + def generateAccessToken: Try[AzureSasCredential] = for { + configuredAccount <- BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint.value)) + azureAccount <- findAzureStorageAccount(configuredAccount) + keys = azureAccount.getKeys.asScala + key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) + first = key.value + sskc = new StorageSharedKeyCredential(configuredAccount.value, first) + bcc = buildBlobContainerClient(sskc, endpoint, container) + bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp) + asc = new AzureSasCredential(bcc.generateSas(bsssv)) + } yield asc +} diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala index a58a8b65438..c93f751b706 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilderFactory.scala @@ -1,31 +1,22 @@ package cromwell.filesystems.blob import akka.actor.ActorSystem -import com.azure.core.credential.AzureSasCredential -import com.azure.core.management.AzureEnvironment -import com.azure.core.management.profile.AzureProfile -import com.azure.identity.DefaultAzureCredentialBuilder -import com.azure.resourcemanager.AzureResourceManager -import com.azure.resourcemanager.storage.models.{StorageAccount, StorageAccountKey} -import com.azure.storage.blob.nio.AzureFileSystem -import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} -import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} -import com.azure.storage.common.StorageSharedKeyCredential import com.typesafe.config.Config import cromwell.core.WorkflowOptions import cromwell.core.path.PathBuilderFactory import net.ceedubs.ficus.Ficus._ -import java.net.URI -import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems} -import java.time.temporal.ChronoUnit -import java.time.{Duration, Instant, OffsetDateTime} import scala.concurrent.{ExecutionContext, Future} -import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Success, Try} final case class BlobFileSystemConfig(config: Config) + +final case class BlobContainerName(value: String) {override def toString: String = value} +final case class StorageAccountName(value: String) {override def toString: String = value} +final case class EndpointURL(value: String) {override def toString: String = value} +final case class WorkspaceId(value: String) {override def toString: String = value} +final case class WorkspaceManagerURL(value: String) {override def toString: String = value} final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory { + val subscription: Option[String] = instanceConfig.as[Option[String]]("subscription") val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("container")) val endpoint: EndpointURL = EndpointURL(instanceConfig.as[String]("endpoint")) val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId(_)) @@ -33,7 +24,7 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL(_)) val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator( - container, endpoint, workspaceId, workspaceManagerURL) + container, endpoint, workspaceId, workspaceManagerURL, subscription) val fsm: BlobFileSystemManager = BlobFileSystemManager(container, endpoint, expiryBufferMinutes, blobTokenGenerator) override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = { @@ -42,123 +33,3 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co } } } - -final case class BlobContainerName(value: String) {override def toString: String = value} -final case class StorageAccountName(value: String) {override def toString: String = value} -final case class EndpointURL(value: String) {override def toString: String = value} -final case class WorkspaceId(value: String) {override def toString: String = value} -final case class WorkspaceManagerURL(value: String) {override def toString: String = value} - -object BlobFileSystemManager { - def parseTokenExpiry(token: AzureSasCredential): Option[Instant] = for { - expiryString <- token.getSignature.split("&").find(_.startsWith("se")).map(_.replaceFirst("se=","")).map(_.replace("%3A", ":")) - instant = Instant.parse(expiryString) - } yield instant - - def buildConfigMap(credential: AzureSasCredential, container: BlobContainerName): Map[String, Object] = { - Map((AzureFileSystem.AZURE_STORAGE_SAS_TOKEN_CREDENTIAL, credential), - (AzureFileSystem.AZURE_STORAGE_FILE_STORES, container.value), - (AzureFileSystem.AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK, java.lang.Boolean.TRUE)) - } - def hasTokenExpired(tokenExpiry: Instant, buffer: Duration): Boolean = Instant.now.plus(buffer).isAfter(tokenExpiry) - def uri(endpoint: EndpointURL) = new URI("azb://?endpoint=" + endpoint) -} -case class BlobFileSystemManager(container: BlobContainerName, - endpoint: EndpointURL, - expiryBufferMinutes: Long, - blobTokenGenerator: BlobTokenGenerator, - fileSystemAPI: FileSystemAPI = FileSystemAPI(), - private val initialExpiration: Option[Instant] = None) { - private var expiry: Option[Instant] = initialExpiration - val buffer: Duration = Duration.of(expiryBufferMinutes, ChronoUnit.MINUTES) - - def getExpiry: Option[Instant] = expiry - def uri: URI = BlobFileSystemManager.uri(endpoint) - def isTokenExpired: Boolean = expiry.exists(BlobFileSystemManager.hasTokenExpired(_, buffer)) - def shouldReopenFilesystem: Boolean = isTokenExpired || expiry.isEmpty - def retrieveFilesystem(): Try[FileSystem] = { - synchronized { - shouldReopenFilesystem match { - case false => fileSystemAPI.getFileSystem(uri).recoverWith { - // If no filesystem already exists, this will create a new connection, with the provided configs - case _: FileSystemNotFoundException => blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) - } - // If the token has expired, OR there is no token record, try to close the FS and regenerate - case true => - fileSystemAPI.closeFileSystem(uri) - blobTokenGenerator.generateAccessToken.flatMap(generateFilesystem(uri, container, _)) - } - } - } - - private def generateFilesystem(uri: URI, container: BlobContainerName, token: AzureSasCredential): Try[FileSystem] = { - expiry = BlobFileSystemManager.parseTokenExpiry(token) - if (expiry.isEmpty) return Failure(new Exception("Could not reopen filesystem, no expiration found")) - Try(fileSystemAPI.newFileSystem(uri, BlobFileSystemManager.buildConfigMap(token, container))) - } - -} - -case class FileSystemAPI() { - def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri)) - def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava) - def closeFileSystem(uri: URI): Option[Unit] = getFileSystem(uri).toOption.map(_.close) -} - -sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]} -object BlobTokenGenerator { - def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL): BlobTokenGenerator = { - createBlobTokenGenerator(container, endpoint, None, None) - } - def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL]): BlobTokenGenerator = { - (container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match { - case (container, endpoint, None, None) => - NativeBlobTokenGenerator(container, endpoint) - case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) => - WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL) - case _ => - throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.") - } - } -} - -case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerURL: WorkspaceManagerURL) extends BlobTokenGenerator { - def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError) -} - -case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL) extends BlobTokenGenerator { - - private val azureProfile = new AzureProfile(AzureEnvironment.AZURE) - private def azureCredentialBuilder = new DefaultAzureCredentialBuilder() - .authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint) - .build - private def azure = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription("62b22893-6bc1-46d9-8a90-806bb3cce3c9") - - private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) - .fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_)) - private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = { - new BlobContainerClientBuilder() - .credential(credential) - .endpoint(endpoint.value) - .containerName(container.value) - .buildClient() - } - private val bcsp = new BlobContainerSasPermission() - .setReadPermission(true) - .setCreatePermission(true) - .setListPermission(true) - - - def generateAccessToken: Try[AzureSasCredential] = for { - configuredAccount <- BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint.value)) - azureAccount <- findAzureStorageAccount(configuredAccount) - keys = azureAccount.getKeys.asScala - key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) - first = key.value - sskc = new StorageSharedKeyCredential(configuredAccount.value, first) - bcc = buildBlobContainerClient(sskc, endpoint, container) - bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp) - asc = new AzureSasCredential(bcc.generateSas(bsssv)) - } yield asc -} - diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index 3ace7d31e38..a93750e030e 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -1,7 +1,10 @@ package cromwell.filesystems.blob import common.mock.MockSugar +import org.mockito.Mockito.when import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import scala.util.Try +import scala.util.Failure object BlobPathBuilderSpec { def buildEndpoint(storageAccount: String) = EndpointURL(s"https://$storageAccount.blob.core.windows.net") @@ -42,6 +45,18 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { } } + it should "provide a readable error when getting an illegal nioPath" in { + val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount") + val container = BlobContainerName("container") + val evalPath = "/path/to/file" + val exception = new Exception("Failed to do the thing") + val fsm = mock[BlobFileSystemManager] + when(fsm.retrieveFilesystem()).thenReturn(Failure(exception)) + val path = BlobPath(evalPath, endpoint, container)(fsm) + val testException = Try(path.nioPath).failed.toOption + testException should contain(exception) + } + ignore should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") val endpointHost = BlobPathBuilder.parseURI(endpoint.value).getHost From 6ac6c32eff71ad4754ff129745196cadee530e03 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Thu, 8 Sep 2022 15:05:32 -0400 Subject: [PATCH 12/16] Some additional scala-ifying --- .../blob/BlobFileSystemManager.scala | 7 +++--- .../filesystems/blob/BlobPathBuilder.scala | 25 +++++++++++-------- .../blob/BlobPathBuilderFactorySpec.scala | 2 +- .../blob/BlobPathBuilderSpec.scala | 6 ++--- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala index 7a0311341d6..3b8f5149055 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala @@ -5,7 +5,6 @@ import com.azure.core.management.AzureEnvironment import com.azure.core.management.profile.AzureProfile import com.azure.identity.DefaultAzureCredentialBuilder import com.azure.resourcemanager.AzureResourceManager -import com.azure.resourcemanager.storage.models.{StorageAccount, StorageAccountKey} import com.azure.storage.blob.nio.AzureFileSystem import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues} import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder} @@ -17,6 +16,7 @@ import java.time.temporal.ChronoUnit import java.time.{Duration, Instant, OffsetDateTime} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} +import com.azure.resourcemanager.storage.models.StorageAccountKey case class FileSystemAPI() { def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri)) @@ -111,7 +111,7 @@ case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: Endp private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription) private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value)) - .fold[Try[StorageAccount]](Failure(new Exception("Azure Storage Account not found")))(Success(_)) + .map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found"))) private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = { new BlobContainerClientBuilder() .credential(credential) @@ -126,7 +126,8 @@ case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: Endp def generateAccessToken: Try[AzureSasCredential] = for { - configuredAccount <- BlobPathBuilder.parseStorageAccount(BlobPathBuilder.parseURI(endpoint.value)) + uri <- BlobPathBuilder.parseURI(endpoint.value) + configuredAccount <- BlobPathBuilder.parseStorageAccount(uri) azureAccount <- findAzureStorageAccount(configuredAccount) keys = azureAccount.getKeys.asScala key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_)) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 362dafc584f..31634a220f4 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -15,8 +15,9 @@ object BlobPathBuilder { case class UnparsableBlobPath(errorMessage: Throwable) extends BlobPathValidation def invalidBlobPathMessage(container: BlobContainerName, endpoint: EndpointURL) = s"Malformed Blob URL for this builder. Expecting a URL for a container $container and endpoint $endpoint" - def parseURI(string: String): URI = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) - def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)).fold[Try[StorageAccountName]](Failure(new Exception("Could not parse storage account")))(Success(_)) + def parseURI(string: String): Try[URI] = Try(URI.create(UrlEscapers.urlFragmentEscaper().escape(string))) + def parseStorageAccount(uri: URI): Try[StorageAccountName] = uri.getHost.split("\\.").find(_.nonEmpty).map(StorageAccountName(_)) + .map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account"))) /** * Validates a that a path from a string is a valid BlobPath of the format: @@ -37,17 +38,19 @@ object BlobPathBuilder { * If the configured container and storage account do not match, the string is considered unparsable */ def validateBlobPath(string: String, container: BlobContainerName, endpoint: EndpointURL): BlobPathValidation = { - Try { - val uri = parseURI(string) - val storageAccount = parseStorageAccount(parseURI(endpoint.value)) - val hasContainer = uri.getPath.split("/").find(_.nonEmpty).contains(container.value) - val hasEndpoint = storageAccount.toOption.exists(parseStorageAccount(uri).toOption.contains(_)) - if (hasContainer && storageAccount.isSuccess && hasEndpoint) { - ValidBlobPath(uri.getPath.replaceFirst("/" + container, "")) + (for { // For comprehension? + testUri <- parseURI(string) + endpointUri <- parseURI(endpoint.value) + testStorageAccount <- parseStorageAccount(testUri) + endpointStorageAccount <- parseStorageAccount(endpointUri) + hasContainer = testUri.getPath.split("/").find(_.nonEmpty).contains(container.value) + hasEndpoint = testStorageAccount.equals(endpointStorageAccount) + blobPathValidation = if (hasContainer && hasEndpoint) { + ValidBlobPath(testUri.getPath.replaceFirst("/" + container, "")) } else { UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) } - } recover { case t => UnparsableBlobPath(t) } get + } yield blobPathValidation) recover { case t => UnparsableBlobPath(t) } get } } @@ -69,7 +72,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/") - override def pathWithoutScheme: String = parseURI(endpoint.value).getHost + "/" + container + "/" + nioPath.toString + override def pathWithoutScheme: String = parseURI(endpoint.value).map(_.getHost + "/" + container + "/" + nioPath.toString).get private def findNioPath(path: String): NioPath = (for { fileSystem <- fsm.retrieveFilesystem() diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala index a7b6790aacb..f5b71a06ead 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderFactorySpec.scala @@ -118,7 +118,7 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga val configMap = BlobFileSystemManager.buildConfigMap(sasToken, container) val azureUri = BlobFileSystemManager.uri(endpoint) // Need a fake filesystem to supply the getFileSystem simulated try - val dummyFileSystem = null + val dummyFileSystem = mock[FileSystem] val fileSystems = mock[FileSystemAPI] when(fileSystems.getFileSystem(azureUri)).thenReturn(Try(dummyFileSystem)) diff --git a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala index a93750e030e..9975065a3e2 100644 --- a/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala +++ b/filesystems/blob/src/test/scala/cromwell/filesystems/blob/BlobPathBuilderSpec.scala @@ -3,8 +3,8 @@ import common.mock.MockSugar import org.mockito.Mockito.when import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import scala.util.Try -import scala.util.Failure + +import scala.util.{Failure, Try} object BlobPathBuilderSpec { def buildEndpoint(storageAccount: String) = EndpointURL(s"https://$storageAccount.blob.core.windows.net") @@ -59,7 +59,7 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar { ignore should "build a blob path from a test string and read a file" in { val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage") - val endpointHost = BlobPathBuilder.parseURI(endpoint.value).getHost + val endpointHost = BlobPathBuilder.parseURI(endpoint.value).map(_.getHost).getOrElse(fail("Could not parse URI")) val store = BlobContainerName("inputs") val evalPath = "/test/inputFile.txt" val blobTokenGenerator = NativeBlobTokenGenerator(store, endpoint) From e6873600d00b6db24a85ae6435c84784ace375c1 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Thu, 8 Sep 2022 15:23:39 -0400 Subject: [PATCH 13/16] Small cleanup --- .../cromwell/filesystems/blob/BlobPathBuilder.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index 31634a220f4..99ce3083e05 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -38,19 +38,19 @@ object BlobPathBuilder { * If the configured container and storage account do not match, the string is considered unparsable */ def validateBlobPath(string: String, container: BlobContainerName, endpoint: EndpointURL): BlobPathValidation = { - (for { // For comprehension? + val blobValidation = for { testUri <- parseURI(string) endpointUri <- parseURI(endpoint.value) testStorageAccount <- parseStorageAccount(testUri) endpointStorageAccount <- parseStorageAccount(endpointUri) hasContainer = testUri.getPath.split("/").find(_.nonEmpty).contains(container.value) hasEndpoint = testStorageAccount.equals(endpointStorageAccount) - blobPathValidation = if (hasContainer && hasEndpoint) { - ValidBlobPath(testUri.getPath.replaceFirst("/" + container, "")) - } else { - UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) + blobPathValidation = (hasContainer && hasEndpoint) match { + case true => ValidBlobPath(testUri.getPath.replaceFirst("/" + container, "")) + case false => UnparsableBlobPath(new MalformedURLException(invalidBlobPathMessage(container, endpoint))) } - } yield blobPathValidation) recover { case t => UnparsableBlobPath(t) } get + } yield blobPathValidation + blobValidation recover { case t => UnparsableBlobPath(t) } get } } @@ -72,6 +72,7 @@ case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, con override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/") + //This is purposefully an unprotected get because if the endpoint cannot be parsed this should fail loudly rather than quietly override def pathWithoutScheme: String = parseURI(endpoint.value).map(_.getHost + "/" + container + "/" + nioPath.toString).get private def findNioPath(path: String): NioPath = (for { From 930449c3829ceeb007be9648ea69111e9503f879 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Fri, 9 Sep 2022 14:27:45 -0400 Subject: [PATCH 14/16] Correcting imports --- .../scala/cromwell/filesystems/blob/BlobPathBuilder.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala index bad1af4ff84..3e69ce2a7bd 100644 --- a/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala +++ b/filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala @@ -1,16 +1,14 @@ package cromwell.filesystems.blob -import com.azure.core.credential.AzureSasCredential -import com.azure.storage.blob.nio.{AzureBlobFileAttributes, AzureFileSystem} +import com.azure.storage.blob.nio.{AzureBlobFileAttributes} import com.google.common.net.UrlEscapers import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.blob.BlobPathBuilder._ import java.net.{MalformedURLException, URI} -import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems, Files} -import scala.jdk.CollectionConverters._ import scala.language.postfixOps import scala.util.{Failure, Success, Try} +import java.nio.file.Files object BlobPathBuilder { From 193e24634286ba1324f1d3b71c84fcbb368fd760 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Mon, 12 Sep 2022 10:10:20 -0400 Subject: [PATCH 15/16] trigger tests From 20c5db7c37e86592175e2d6d9ba90e1f0e265e00 Mon Sep 17 00:00:00 2001 From: Christian Freitas Date: Mon, 12 Sep 2022 13:41:51 -0400 Subject: [PATCH 16/16] trigger tests