Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-743 Enable TES task creation with BlobPaths #6921

Merged
merged 7 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ case class BlobFileSystemManager(

sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]}
object BlobTokenGenerator {
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String]): BlobTokenGenerator = {
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId]): BlobTokenGenerator = {
createBlobTokenGenerator(container, endpoint, None, None, subscription)
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL], subscription: Option[String]): BlobTokenGenerator = {
def createBlobTokenGenerator(container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: Option[WorkspaceId],
workspaceManagerURL: Option[WorkspaceManagerURL],
subscription: Option[SubscriptionId]
): BlobTokenGenerator = {
(container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match {
case (container, endpoint, None, None) =>
NativeBlobTokenGenerator(container, endpoint, subscription)
Expand All @@ -100,13 +105,13 @@ case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: Endpoin
def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError)
}

case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[String] = None) extends BlobTokenGenerator {
case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobTokenGenerator {

private val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
private def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build
private def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub)
private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.value)
private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()
private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)

Expand All @@ -123,6 +128,7 @@ case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: Endp
.setReadPermission(true)
.setCreatePermission(true)
.setListPermission(true)
.setWritePermission(true)


def generateAccessToken: Try[AzureSasCredential] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,40 @@ class BlobPathBuilder(container: BlobContainerName, endpoint: EndpointURL)(priva
override def name: String = "Azure Blob Storage"
}

object BlobPath {
// The Azure NIO library uses `{containerName}:` as the root of the path.
// This doesn't work well for our need to easily transfer back and forth
// to and from the blob URL format. This method removes anything up to and including
// the first colon, to create a path string useful for working with BlobPath.
// This is safe because the NIO library enforces no colons except to mark
// the root container name.
private def nioPathString(nioPath: NioPath): String = {
val pathStr = nioPath.toString
pathStr.substring(pathStr.indexOf(":")+1)
}

def apply(nioPath: NioPath,
endpoint: EndpointURL,
container: BlobContainerName,
fsm: BlobFileSystemManager): BlobPath = {
BlobPath(nioPathString(nioPath), endpoint, container)(fsm)
}
}

case class BlobPath private[blob](pathString: String, endpoint: EndpointURL, container: BlobContainerName)(private val fsm: BlobFileSystemManager) extends Path {
override def nioPath: NioPath = findNioPath(pathString)

override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath.toString, endpoint, container)(fsm)
override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container, fsm)

override def pathAsString: String = List(endpoint, container, nioPath.toString).mkString("/")
override def pathAsString: String = List(endpoint, container, pathString.stripPrefix("/")).mkString("/")

//This is purposefully an unprotected get because if the endpoint cannot be parsed this should fail loudly rather than quietly
override def pathWithoutScheme: String = parseURI(endpoint.value).map(_.getHost + "/" + container + "/" + nioPath.toString).get
override def pathWithoutScheme: String = parseURI(endpoint.value).map(u => List(u.getHost, container, pathString).mkString("/")).get

private def findNioPath(path: String): NioPath = (for {
fileSystem <- fsm.retrieveFilesystem()
nioPath = fileSystem.getPath(path)
// The Azure NIO library uses `{container}:` to represent the root of the path
nioPath = fileSystem.getPath(s"${container.value}:", path)
// This is purposefully an unprotected get because the NIO API needing an unwrapped path object.
// If an error occurs the api expects a thrown exception
} yield nioPath).get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import scala.concurrent.{ExecutionContext, Future}

final case class BlobFileSystemConfig(config: Config)

final case class SubscriptionId(value: String) {override def toString: String = value}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaning this up while I was in here.

final case class BlobContainerName(value: String) {override def toString: String = value}
final case class StorageAccountName(value: String) {override def toString: String = value}
final case class EndpointURL(value: String) {override def toString: String = value}
final case class WorkspaceId(value: String) {override def toString: String = value}
final case class WorkspaceManagerURL(value: String) {override def toString: String = value}
final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Config, singletonConfig: BlobFileSystemConfig) extends PathBuilderFactory {
val subscription: Option[String] = instanceConfig.as[Option[String]]("subscription")
val subscription: Option[SubscriptionId] = instanceConfig.as[Option[String]]("subscription").map(SubscriptionId)
val container: BlobContainerName = BlobContainerName(instanceConfig.as[String]("container"))
val endpoint: EndpointURL = EndpointURL(instanceConfig.as[String]("endpoint"))
val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId(_))
val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId)
val expiryBufferMinutes: Long = instanceConfig.as[Option[Long]]("expiry-buffer-minutes").getOrElse(10)
val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL(_))
val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL)

val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(
container, endpoint, workspaceId, workspaceManagerURL, subscription)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,17 @@ class BlobPathBuilderSpec extends AnyFlatSpec with Matchers with MockSugar {
val fileText = (is.readAllBytes.map(_.toChar)).mkString
fileText should include ("This is my test file!!!! Did it work?")
}

ignore should "resolve a path without duplicating container name" in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore?
Also, based on comments in BlobPathBuilder.scala, I think I can see what you're checking here, but it could be confusing to a future reader to read this statement about what code shouldn't do instead of what it should do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ignored because it depends on the local environment being logged in to Azure, so it's useful for dev testing, not for Travis testing. I waffled on whether to include it at all; we still need to work out our approach for testing the "thin ring" of logic that depends on library internals. I ended up leaving it in since there are other tests in this file that follow the same pattern. I never went back and renamed it to something clearer, though, will do that.

val endpoint = BlobPathBuilderSpec.buildEndpoint("coaexternalstorage")
val store = BlobContainerName("inputs")
val blobTokenGenerator = NativeBlobTokenGenerator(store, endpoint)
val fsm: BlobFileSystemManager = BlobFileSystemManager(store, endpoint, 10, blobTokenGenerator)

val rootString = s"${endpoint.value}/${store.value}/cromwell-execution"
val blobRoot: BlobPath = new BlobPathBuilder(store, endpoint)(fsm) build rootString getOrElse fail()
blobRoot.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution")
val otherFile = blobRoot.resolve("test/inputFile.txt")
otherFile.toAbsolutePath.pathAsString should equal ("https://coaexternalstorage.blob.core.windows.net/inputs/cromwell-execution/test/inputFile.txt")
}
}