Skip to content

Commit

Permalink
WX-1174 Adjust NIO Copy functionality (#7207)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Nichols <anichols@broadinstitute.org>
  • Loading branch information
JVThomas and aednichols authored Aug 22, 2023
1 parent bdc1ab3 commit f64b3b9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public final class AzureFileSystem extends FileSystem {
private final Integer downloadResumeRetries;
private FileStore defaultFileStore;
private boolean closed;

private AzureSasCredential currentActiveSasCredential;
private Instant expiry;

AzureFileSystem(AzureFileSystemProvider parentFileSystemProvider, String endpoint, Map<String, ?> config)
Expand All @@ -188,6 +190,7 @@ public final class AzureFileSystem extends FileSystem {
this.putBlobThreshold = (Long) config.get(AZURE_STORAGE_PUT_BLOB_THRESHOLD);
this.maxConcurrencyPerRequest = (Integer) config.get(AZURE_STORAGE_MAX_CONCURRENCY_PER_REQUEST);
this.downloadResumeRetries = (Integer) config.get(AZURE_STORAGE_DOWNLOAD_RESUME_RETRIES);
this.currentActiveSasCredential = (AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL);

// Initialize and ensure access to FileStores.
this.defaultFileStore = this.initializeFileStore(config);
Expand Down Expand Up @@ -496,6 +499,13 @@ Integer getMaxConcurrencyPerRequest() {
return this.maxConcurrencyPerRequest;
}

public String createSASAppendedURL(String url) throws IllegalStateException {
if (Objects.isNull(currentActiveSasCredential)) {
throw new IllegalStateException("No current active SAS credential present");
}
return url + "?" + currentActiveSasCredential.getSignature();
}

public Optional<Instant> getExpiry() {
return Optional.ofNullable(expiry);
}
Expand All @@ -514,5 +524,6 @@ public boolean isExpired(Duration buffer) {
return Optional.ofNullable(this.expiry)
.map(e -> Instant.now().plus(buffer).isAfter(e))
.orElse(true);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -695,16 +696,23 @@ public void copy(Path source, Path destination, CopyOption... copyOptions) throw
// Remove accepted options as we find them. Anything left we don't support.
boolean replaceExisting = false;
List<CopyOption> optionsList = new ArrayList<>(Arrays.asList(copyOptions));
if (!optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER, new UnsupportedOperationException(
"StandardCopyOption.COPY_ATTRIBUTES must be specified as the service will always copy "
+ "file attributes."));
// NOTE: We're going to assume COPY_ATTRIBUTES as a default copy option (but can still be provided and handled safely)
// REPLACE_EXISTING must still be provided if you want to replace existing file

// if (!optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
// throw LoggingUtility.logError(ClientLoggerHolder.LOGGER, new UnsupportedOperationException(
// "StandardCopyOption.COPY_ATTRIBUTES must be specified as the service will always copy "
// + "file attributes."));
// }
if(optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
optionsList.remove(StandardCopyOption.COPY_ATTRIBUTES);
}
optionsList.remove(StandardCopyOption.COPY_ATTRIBUTES);

if (optionsList.contains(StandardCopyOption.REPLACE_EXISTING)) {
replaceExisting = true;
optionsList.remove(StandardCopyOption.REPLACE_EXISTING);
}

if (!optionsList.isEmpty()) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new UnsupportedOperationException("Unsupported copy option found. Only "
Expand Down Expand Up @@ -760,9 +768,16 @@ public void copy(Path source, Path destination, CopyOption... copyOptions) throw
customer scenarios and how many virtual directories they copy, it could be better to check the directory status
first and then do a copy or createDir, which would always be two requests for all resource types.
*/

try {
/*
Format the url by appending the SAS token as a param, otherwise the copy request will fail.
AzureFileSystem has been updated to handle url transformation via createSASAuthorizedURL()
*/
AzureFileSystem afs = (AzureFileSystem) sourceRes.getPath().getFileSystem();
String sasAppendedSourceUrl = afs.createSASAppendedURL(sourceRes.getBlobClient().getBlobUrl());
SyncPoller<BlobCopyInfo, Void> pollResponse =
destinationRes.getBlobClient().beginCopy(sourceRes.getBlobClient().getBlobUrl(), null, null, null,
destinationRes.getBlobClient().beginCopy(sasAppendedSourceUrl, null, null, null,
null, requestConditions, null);
pollResponse.waitForCompletion(Duration.ofSeconds(COPY_TIMEOUT_SECONDS));
} catch (BlobStorageException e) {
Expand Down
3 changes: 2 additions & 1 deletion runConfigurations/Repo template_ Cromwell server TES.run.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Repo template: Cromwell server TES" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="$USER_HOME$/.sdkman/candidates/java/current" />
<option name="ALTERNATIVE_JRE_PATH" value="$PROJECT_DIR$/../.sdkman/candidates/java/current" />
<envs>
<env name="CROMWELL_BUILD_CENTAUR_SLICK_PROFILE" value="slick.jdbc.MySQLProfile$" />
<env name="CROMWELL_BUILD_CENTAUR_JDBC_DRIVER" value="com.mysql.cj.jdbc.Driver" />
Expand All @@ -16,6 +16,7 @@
<option name="VM_PARAMETERS" value="-Dconfig.file=target/ci/resources/tes_application.conf" />
<method v="2">
<option name="Make" enabled="true" />
<option name="RunConfigurationTask" enabled="true" run_configuration_name="renderCiResources" run_configuration_type="SbtRunConfiguration" />
</method>
</configuration>
</component>

0 comments on commit f64b3b9

Please sign in to comment.