Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1260] Acquire sas token from task runner #7241

Merged
merged 38 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
67070f6
stash
THWiseman Oct 17, 2023
d33dad3
update branch
THWiseman Oct 17, 2023
34d63e5
small refactor
THWiseman Oct 17, 2023
0d279eb
progress
THWiseman Oct 17, 2023
06531d7
blobular
THWiseman Oct 19, 2023
aa1bc52
got WSM endpoint
THWiseman Oct 19, 2023
8d3a2e7
tidy
THWiseman Oct 19, 2023
678f05c
mostly there
THWiseman Oct 20, 2023
9991e5a
working
THWiseman Oct 23, 2023
8cad15b
unit tests
THWiseman Oct 23, 2023
408e008
tidy
THWiseman Oct 23, 2023
05cab44
4
THWiseman Oct 23, 2023
e04f175
fix mock
THWiseman Oct 24, 2023
05d3991
configureable
THWiseman Oct 24, 2023
045835b
better mock path
THWiseman Oct 25, 2023
f37ee7a
mocking has escalated to bullying
THWiseman Oct 25, 2023
75ca3ea
better logging
THWiseman Oct 25, 2023
5b6852c
most PR feedback
THWiseman Oct 31, 2023
f1e9ea7
improve bash script and fix test
THWiseman Oct 31, 2023
4c4b92a
Merge branch 'develop' into WX-1260
THWiseman Oct 31, 2023
9c9a293
fix stub
THWiseman Oct 31, 2023
b1699d5
stubbing
THWiseman Nov 1, 2023
96e7d7d
Merge branch 'develop' into WX-1260
THWiseman Nov 2, 2023
3153345
stubtry
THWiseman Nov 2, 2023
f958150
caching
THWiseman Nov 2, 2023
4146ac1
caching
THWiseman Nov 2, 2023
d84f847
Merge branch 'develop' into WX-1260
THWiseman Nov 3, 2023
f694f65
parens
THWiseman Nov 3, 2023
8e05ca0
some but not all Friday Feedback
THWiseman Nov 3, 2023
a62307a
PR feedback
THWiseman Nov 6, 2023
e7cd74c
missed file
THWiseman Nov 7, 2023
510aa31
more sensible mocks
THWiseman Nov 7, 2023
b5e3c37
mock
THWiseman Nov 7, 2023
7f9044e
mock
THWiseman Nov 7, 2023
e227bb1
ACTUALLY FIXED IT
THWiseman Nov 7, 2023
5293997
Update supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes…
THWiseman Nov 13, 2023
60f999b
Update filesystems/blob/src/main/scala/cromwell/filesystems/blob/Blob…
THWiseman Nov 13, 2023
c519f58
expiry
THWiseman Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cromwell.backend.standard

import java.io.IOException

import akka.actor.{Actor, ActorLogging, ActorRef}
import akka.event.LoggingReceive
import cats.implicits._
Expand Down Expand Up @@ -329,7 +328,7 @@ trait StandardAsyncExecutionActor
}

/** Any custom code that should be run within commandScriptContents before the instantiated command. */
def scriptPreamble: String = ""
def scriptPreamble: ErrorOr[String] = "".valid
THWiseman marked this conversation as resolved.
Show resolved Hide resolved

def cwd: Path = commandDirectory
def rcPath: Path = cwd./(jobPaths.returnCodeFilename)
Expand Down Expand Up @@ -427,10 +426,12 @@ trait StandardAsyncExecutionActor
|find . -type d -exec sh -c '[ -z "$$(ls -A '"'"'{}'"'"')" ] && touch '"'"'{}'"'"'/.file' \\;
|)""".stripMargin)

val errorOrPreamble: ErrorOr[String] = scriptPreamble

// The `tee` trickery below is to be able to redirect to known filenames for CWL while also streaming
// stdout and stderr for PAPI to periodically upload to cloud storage.
// https://stackoverflow.com/questions/692000/how-do-i-write-stderr-to-a-file-while-using-tee-with-a-pipe
(errorOrDirectoryOutputs, errorOrGlobFiles).mapN((directoryOutputs, globFiles) =>
(errorOrDirectoryOutputs, errorOrGlobFiles, errorOrPreamble).mapN((directoryOutputs, globFiles, preamble) =>
s"""|#!$jobShell
|DOCKER_OUTPUT_DIR_LINK
|cd ${cwd.pathAsString}
Expand Down Expand Up @@ -464,7 +465,7 @@ trait StandardAsyncExecutionActor
|)
|mv $rcTmpPath $rcPath
|""".stripMargin
.replace("SCRIPT_PREAMBLE", scriptPreamble)
.replace("SCRIPT_PREAMBLE", preamble)
.replace("ENVIRONMENT_VARIABLES", environmentVariables)
.replace("INSTANTIATED_COMMAND", commandString)
.replace("SCRIPT_EPILOGUE", scriptEpilogue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.temporal.ChronoUnit
import java.time.{Duration, OffsetDateTime}
import java.util.UUID
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -160,12 +161,14 @@
*/
def createBlobTokenGenerator(workspaceManagerClient: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]): BlobSasTokenGenerator = {
WSMBlobSasTokenGenerator(workspaceManagerClient, overrideWsmAuthToken)
new WSMBlobSasTokenGenerator(workspaceManagerClient, overrideWsmAuthToken)
}

}

case class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClientProvider,
case class WSMTerraCoordinates(wsmEndpoint: String, workspaceId: UUID, containerResourceId: UUID)

class WSMBlobSasTokenGenerator(wsmClientProvider: WorkspaceManagerApiClientProvider,
overrideWsmAuthToken: Option[String]) extends BlobSasTokenGenerator {

/**
Expand All @@ -178,17 +181,14 @@
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken(endpoint: EndpointURL, container: BlobContainerName): Try[AzureSasCredential] = {
val wsmAuthToken: Try[String] = overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry
}
val wsmAuthToken: Try[String] = getWsmAuth

Check warning on line 184 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L184

Added line #L184 was not covered by tests
container.workspaceId match {
// If this is a Terra workspace, request a token from WSM
case Success(workspaceId) => {
(for {
wsmAuth <- wsmAuthToken
wsmAzureResourceClient = wsmClientProvider.getControlledAzureResourceApi(wsmAuth)
resourceId <- getContainerResourceId(workspaceId, container, wsmAuth)
resourceId <- getContainerResourceId(workspaceId, container, Some(wsmAuth))

Check warning on line 191 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L191

Added line #L191 was not covered by tests
THWiseman marked this conversation as resolved.
Show resolved Hide resolved
sasToken <- wsmAzureResourceClient.createAzureStorageContainerSasToken(workspaceId, resourceId)
} yield sasToken).recoverWith {
// If the storage account was still not found in WSM, this may be a public filesystem
Expand All @@ -201,9 +201,59 @@
}
}

def getContainerResourceId(workspaceId: UUID, container: BlobContainerName, wsmAuth : String): Try[UUID] = {
val wsmResourceClient = wsmClientProvider.getResourceApi(wsmAuth)
wsmResourceClient.findContainerResourceId(workspaceId, container)
private val cachedContainerResourceIds = new mutable.HashMap[BlobContainerName, UUID]()
THWiseman marked this conversation as resolved.
Show resolved Hide resolved

// Optionally provide wsmAuth to avoid acquiring it twice in generateBlobSasToken.
// In the case that the resourceId is not cached and no auth is provided, this function will acquire a new auth as necessary.
private def getContainerResourceId(workspaceId: UUID, container: BlobContainerName, precomputedWsmAuth: Option[String]): Try[UUID] = {
cachedContainerResourceIds.get(container) match {
case Some(id) => Try(id) //cache hit
case _ => { //cache miss
val auth: Try[String] = precomputedWsmAuth.map(auth => Try(auth)).getOrElse(getWsmAuth)

Check warning on line 212 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L209-L212

Added lines #L209 - L212 were not covered by tests
val resourceId = for {
wsmAuth <- auth
wsmResourceApi = wsmClientProvider.getResourceApi(wsmAuth)
resourceId <- wsmResourceApi.findContainerResourceId(workspaceId, container)

Check warning on line 216 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L214-L216

Added lines #L214 - L216 were not covered by tests
} yield resourceId
resourceId.map(id => cachedContainerResourceIds.put(container, id)) //NB: Modifying cache state here.
cachedContainerResourceIds.get(container) match {
case Some(uuid) => Try(uuid)
case _ => Failure(new NoSuchElementException("Could not retrieve container resource ID from WSM"))

Check warning on line 221 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L218-L221

Added lines #L218 - L221 were not covered by tests
}
}
}
}

private def getWsmAuth: Try[String] = {
overrideWsmAuthToken match {
case Some(t) => Success(t)
case None => AzureCredentials.getAccessToken(None).toTry

Check warning on line 230 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L228-L230

Added lines #L228 - L230 were not covered by tests
}
}

private def parseTerraWorkspaceIdFromPath(blobPath: BlobPath): Try[UUID] = {
if (blobPath.container.value.startsWith("sc-")) Try(UUID.fromString(blobPath.container.value.substring(3)))
else Failure(new Exception("Could not parse workspace ID from storage container. Are you sure this is a file in a Terra Workspace?"))

Check warning on line 236 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L235-L236

Added lines #L235 - L236 were not covered by tests
}

/**
* Return a REST endpoint that will reply with a sas token for the blob storage container associated with the provided blob path.
* @param blobPath A blob path of a file living in a blob container that WSM knows about (likely a workspace container).
* @param tokenDuration How long will the token last after being generated. Default is 8 hours. Sas tokens won't last longer than 24h.
* NOTE: If a blobPath is provided for a file in a container other than what this token generator was constructed for,
* this function will make two REST requests. Otherwise, the relevant data is already cached locally.
*/
def getWSMSasFetchEndpoint(blobPath: BlobPath, tokenDuration: Option[Duration] = None): Try[String] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ToL] This could be a case where a default in the parameter list makes more sense than Option

Suggested change
def getWSMSasFetchEndpoint(blobPath: BlobPath, tokenDuration: Option[Duration] = None): Try[String] = {
def getWSMSasFetchEndpoint(blobPath: BlobPath, tokenDuration: Duration = Duration.ofSeconds(8*60*60)): Try[String] = {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, I missed this in my review, good catch. Rather than either of those, I think we want to pass NO duration through to WSM if this method's caller doesn't specify one, so we use the WSM default rather than setting our own.

val lifetimeSeconds: Int = tokenDuration.map(d => d.toSeconds.intValue).getOrElse(8*60*60)
val wsmEndpoint = wsmClientProvider.getBaseWorkspaceManagerUrl

Check warning on line 248 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L247-L248

Added lines #L247 - L248 were not covered by tests
val terraInfo: Try[WSMTerraCoordinates] = for {
workspaceId <- parseTerraWorkspaceIdFromPath(blobPath)
containerResourceId <- getContainerResourceId(workspaceId, blobPath.container, None)
coordinates = WSMTerraCoordinates(wsmEndpoint, workspaceId, containerResourceId)
} yield coordinates
terraInfo.map{terraCoordinates =>

Check warning on line 254 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobFileSystemManager.scala#L250-L254

Added lines #L250 - L254 were not covered by tests
s"${terraCoordinates.wsmEndpoint}/api/workspaces/v1/${terraCoordinates.workspaceId.toString}/resources/controlled/azure/storageContainer/${terraCoordinates.containerResourceId.toString}/getSasToken?sasExpirationDuration=$lifetimeSeconds"
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@
* @return Path string relative to the container root.
*/
def pathWithoutContainer : String = pathString


def getFilesystemManager: BlobFileSystemManager = fsm

Check warning on line 189 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala#L189

Added line #L189 was not covered by tests

override def getSymlinkSafePath(options: LinkOption*): Path = toAbsolutePath

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
trait WorkspaceManagerApiClientProvider {
def getControlledAzureResourceApi(token: String): WsmControlledAzureResourceApi
def getResourceApi(token: String): WsmResourceApi
def getBaseWorkspaceManagerUrl: String
}

class HttpWorkspaceManagerClientProvider(baseWorkspaceManagerUrl: WorkspaceManagerURL) extends WorkspaceManagerApiClientProvider {
Expand All @@ -40,6 +41,7 @@
apiClient.setAccessToken(token)
WsmControlledAzureResourceApi(new ControlledAzureResourceApi(apiClient))
}
def getBaseWorkspaceManagerUrl: String = baseWorkspaceManagerUrl.value

Check warning on line 44 in filesystems/blob/src/main/scala/cromwell/filesystems/blob/WorkspaceManagerApiClientProvider.scala

View check run for this annotation

Codecov / codecov/patch

filesystems/blob/src/main/scala/cromwell/filesystems/blob/WorkspaceManagerApiClientProvider.scala#L44

Added line #L44 was not covered by tests
}

case class WsmResourceApi(resourcesApi : ResourceApi) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,12 @@
private val DockerMonitoringLogPath: Path = GcpBatchWorkingDisk.MountPoint.resolve(gcpBatchCallPaths.batchMonitoringLogFilename)
private val DockerMonitoringScriptPath: Path = GcpBatchWorkingDisk.MountPoint.resolve(gcpBatchCallPaths.batchMonitoringScriptFilename)

override def scriptPreamble: String = {
override def scriptPreamble: ErrorOr[String] = {
if (monitoringOutput.isDefined) {
s"""|touch $DockerMonitoringLogPath
|chmod u+x $DockerMonitoringScriptPath
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin
} else ""
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin.valid

Check warning on line 670 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L670

Added line #L670 was not covered by tests
} else "".valid
}

private[actors] def generateInputs(jobDescriptor: BackendJobDescriptor): Set[GcpBatchInput] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,12 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta

private lazy val isDockerImageCacheUsageRequested = runtimeAttributes.useDockerImageCache.getOrElse(useDockerImageCache(jobDescriptor.workflowDescriptor))

override def scriptPreamble: String = {
override def scriptPreamble: ErrorOr[String] = {
if (monitoringOutput.isDefined) {
s"""|touch $DockerMonitoringLogPath
|chmod u+x $DockerMonitoringScriptPath
|$DockerMonitoringScriptPath > $DockerMonitoringLogPath &""".stripMargin
} else ""
}.valid else "".valid
}

override def globParentDirectory(womGlobFile: WomGlobFile): Path = {
Expand Down
Loading
Loading