Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-728 Add configurable WSM client to Cromwell #6948

Merged
merged 21 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These Centaur files do not have an associated .test file yet so they are not actually running, but I did think it was time to promote them from Slack/local disk to source control.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love having references to specific Azure resources checked in. Even if we did have good integration testing set up, I think we'd get them from Vault or whatever rather than putting them in source control. I agree that the team needs better access to this stuff, though. Maybe throw it all into a wiki page?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of places we do this with GCP, specifically the buckets in broad-dsde-cromwell-dev.


We don't have to go down the same route for Azure, but I'd prefer to defer that discussion to when we set up integration testing. For now, I genericized it since I think we truly do have no idea what account we'll be in then.

"fileChecksum.inputFile": "https://coaexternalstorage.blob.core.windows.net/cromwell/user-inputs/inputFile.txt"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
task md5 {
File inputFile
command {
echo "`date`: Running checksum on ${inputFile}..."
md5sum ${inputFile} > md5sum.txt
echo "`date`: Checksum is complete."
}
output {
File result = "md5sum.txt"
}
runtime {
docker: 'ubuntu:18.04'
preemptible: true
}
}

workflow fileChecksum {
File inputFile
call md5 { input: inputFile=inputFile}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import scala.util.{Failure, Success, Try}
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.typesafe.scalalogging.LazyLogging

import java.util.UUID

case class FileSystemAPI() {
def getFileSystem(uri: URI): Try[FileSystem] = Try(FileSystems.getFileSystem(uri))
def newFileSystem(uri: URI, config: Map[String, Object]): FileSystem = FileSystems.newFileSystem(uri, config.asJava)
Expand Down Expand Up @@ -82,31 +84,32 @@ case class BlobFileSystemManager(
sealed trait BlobTokenGenerator {def generateAccessToken: Try[AzureSasCredential]}
object BlobTokenGenerator {
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId]): BlobTokenGenerator = {
createBlobTokenGenerator(container, endpoint, None, None, subscription)
NativeBlobTokenGenerator(container, endpoint, subscription)
}
def createBlobTokenGenerator(container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: Option[WorkspaceId],
workspaceManagerURL: Option[WorkspaceManagerURL],
subscription: Option[SubscriptionId]
): BlobTokenGenerator = {
(container: BlobContainerName, endpoint: EndpointURL, workspaceId, workspaceManagerURL) match {
case (container, endpoint, None, None) =>
NativeBlobTokenGenerator(container, endpoint, subscription)
case (container, endpoint, Some(workspaceId), Some(workspaceManagerURL)) =>
WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL)
case _ =>
throw new Exception("Arguments provided do not match any available BlobTokenGenerator implementation.")
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerClient: WorkspaceManagerApiClientProvider): BlobTokenGenerator = {
WSMBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerClient)
}
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL): BlobTokenGenerator = createBlobTokenGenerator(container, endpoint, None)
def createBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: Option[WorkspaceId], workspaceManagerURL: Option[WorkspaceManagerURL]): BlobTokenGenerator =
createBlobTokenGenerator(container, endpoint, workspaceId, workspaceManagerURL, None)
Comment on lines 86 to -104
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pared down this helper class to exactly two functions, one for each of NativeBlobTokenGenerator and WSMBlobTokenGenerator. The conditional logic promoted to where we interpret the config, it seemed better to get it out of the way as early as possible and not pass around Option. Can always tweak/revert.


}

case class WSMBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, workspaceId: WorkspaceId, workspaceManagerURL: WorkspaceManagerURL) extends BlobTokenGenerator {
def generateAccessToken: Try[AzureSasCredential] = Failure(new NotImplementedError)
case class WSMBlobTokenGenerator(
container: BlobContainerName,
endpoint: EndpointURL,
workspaceId: WorkspaceId,
wsmClient: WorkspaceManagerApiClientProvider) extends BlobTokenGenerator {

def generateAccessToken: Try[AzureSasCredential] = Try {
val token = wsmClient.getControlledAzureResourceApi.createAzureStorageContainerSasToken(
UUID.fromString(workspaceId.value),
UUID.fromString("00001111-2222-3333-aaaa-bbbbccccdddd"),
null,
null,
null,
null
).getToken // TODO `null` items may be required, investigate in WX-696

new AzureSasCredential(token) // TODO Does `signature` actually mean token? save for WX-696
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice breadcrumbs!

}
}

case class NativeBlobTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobTokenGenerator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ final case class BlobPathBuilderFactory(globalConfig: Config, instanceConfig: Co
val workspaceId: Option[WorkspaceId] = instanceConfig.as[Option[String]]("workspace-id").map(WorkspaceId)
val expiryBufferMinutes: Long = instanceConfig.as[Option[Long]]("expiry-buffer-minutes").getOrElse(10)
val workspaceManagerURL: Option[WorkspaceManagerURL] = singletonConfig.config.as[Option[String]]("workspace-manager-url").map(WorkspaceManagerURL)
val b2cToken: Option[String] = instanceConfig.as[Option[String]]("b2cToken")

val blobTokenGenerator: BlobTokenGenerator = (workspaceManagerURL, b2cToken, workspaceId) match {
case (Some(url), Some(token), Some(workspaceId)) =>
val wsmClient: WorkspaceManagerApiClientProvider = new HttpWorkspaceManagerClientProvider(url, token)
// parameterizing client instead of URL to make injecting mock client possible
BlobTokenGenerator.createBlobTokenGenerator(container, endpoint, workspaceId, wsmClient)
case _ =>
BlobTokenGenerator.createBlobTokenGenerator(container, endpoint, subscription)
}

val blobTokenGenerator: BlobTokenGenerator = BlobTokenGenerator.createBlobTokenGenerator(
container, endpoint, workspaceId, workspaceManagerURL, subscription)
val fsm: BlobFileSystemManager = BlobFileSystemManager(container, endpoint, expiryBufferMinutes, blobTokenGenerator)

override def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[BlobPathBuilder] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cromwell.filesystems.blob

import bio.terra.workspace.api.ControlledAzureResourceApi
import bio.terra.workspace.client.ApiClient

/**
* Represents a way to get various workspace manager clients
*
* Pared down from `org.broadinstitute.dsde.rawls.dataaccess.workspacemanager.WorkspaceManagerApiClientProvider`
*
* For testing, create an anonymous subclass as in `org.broadinstitute.dsde.rawls.dataaccess.workspacemanager.HttpWorkspaceManagerDAOSpec`
*/
trait WorkspaceManagerApiClientProvider {
def getApiClient: ApiClient

def getControlledAzureResourceApi: ControlledAzureResourceApi

}

class HttpWorkspaceManagerClientProvider(baseWorkspaceManagerUrl: WorkspaceManagerURL, token: String) extends WorkspaceManagerApiClientProvider {
def getApiClient: ApiClient = {
val client: ApiClient = new ApiClient()
client.setBasePath(baseWorkspaceManagerUrl.value)
client.setAccessToken(token)

client
}

def getControlledAzureResourceApi: ControlledAzureResourceApi =
new ControlledAzureResourceApi(getApiClient)

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.mockito.Mockito._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.nio.channels.UnresolvedAddressException
import java.nio.file.{FileSystem, FileSystemNotFoundException}
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
Expand All @@ -27,8 +28,12 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
it should "parse configs for a functioning factory" in {
val endpoint = BlobPathBuilderSpec.buildEndpoint("storageAccount")
val container = BlobContainerName("storageContainer")
val workspaceId = WorkspaceId("mockWorkspaceId")
val workspaceManagerURL = WorkspaceManagerURL("https://test.ws.org")

// Use a real UUID to help along the hacky "unit" test below.
//val workspaceId = WorkspaceId("mockWorkspaceId")
val workspaceId = WorkspaceId("B0BAFE77-0000-0000-0000-000000000000")

val workspaceManagerURL = WorkspaceManagerURL("https://wsm.example.com")
val instanceConfig = ConfigFactory.parseString(
s"""
|container = "$container"
Expand All @@ -45,6 +50,22 @@ class BlobPathBuilderFactorySpec extends AnyFlatSpec with Matchers with MockSuga
factory.expiryBufferMinutes should equal(10L)
factory.workspaceId should contain(workspaceId)
factory.workspaceManagerURL should contain(workspaceManagerURL)

// Hacky "unit" test to try and exercise this branch's WSM code and dependencies.
// Should probably be in a Spec that extends TestKitSuite which provides and cleans up an ActorSystem.
import akka.actor.ActorSystem
import cromwell.core.WorkflowOptions
import scala.concurrent.{Await, ExecutionContext}

implicit val system: ActorSystem = ActorSystem("BlobPathBuilderFactorySpec")
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val pathBuilder =
Await.result(factory.withOptions(WorkflowOptions.empty), scala.concurrent.duration.Duration(10, "seconds"))
val sizeTry = pathBuilder.build(s"$endpoint/$container/inputs/test/testFile.wdl").map(_.size)
val sizeFailure = sizeTry.failed.get
sizeFailure shouldBe a[javax.ws.rs.ProcessingException]
sizeFailure.getCause.getClass shouldBe classOf[UnresolvedAddressException]
Await.result(system.terminate(), scala.concurrent.duration.Duration(10, "seconds"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we actually want to merge this, seems more like a temporary tool to reduce test cycle time.

}

it should "build an example sas token of the correct format" in {
Expand Down
Loading