Skip to content

Commit

Permalink
Fix Bug in Azure Repo Exception Handling (#47968) (#48031) (#48047)
Browse files Browse the repository at this point in the history
We were incorrectly handling `IOExceptions` thrown by
the `InputStream` side of the upload operation, resulting
in a `ClassCastException` as we expected to never get
`IOException` from the Azure SDK code but we do in practice.
This PR also sets an assertion on `markSupported` for the
streams used by the SDK as adding the test for this scenario
revealed that the SDK client would retry uploads for
non-mark-supporting streams on `IOException` in the `InputStream`.
  • Loading branch information
original-brownbear authored Oct 15, 2019
1 parent 90b9052 commit bca0c8d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public boolean blobExists(String blobName) {
logger.trace("blobExists({})", blobName);
try {
return blobStore.blobExists(buildKey(blobName));
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | StorageException | IOException e) {
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
}
return false;
Expand Down Expand Up @@ -88,7 +88,6 @@ public InputStream readBlob(String blobName) throws IOException {
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);

try {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} catch (URISyntaxException|StorageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@
package org.elasticsearch.repositories.azure;

import com.microsoft.azure.storage.LocationMode;

import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.AbstractComponent;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Map;

import static java.util.Collections.emptyMap;

import static org.elasticsearch.repositories.azure.AzureRepository.Repository;

public class AzureBlobStore extends AbstractComponent implements BlobStore {
Expand Down Expand Up @@ -94,11 +92,11 @@ public void delete(BlobPath path) throws IOException {
public void close() {
}

public boolean blobExists(String blob) throws URISyntaxException, StorageException {
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
return service.blobExists(clientName, container, blob);
}

public void deleteBlob(String blob) throws URISyntaxException, StorageException {
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
service.deleteBlob(clientName, container, blob);
}

Expand All @@ -107,12 +105,12 @@ public InputStream getInputStream(String blob) throws URISyntaxException, Storag
}

public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
throws URISyntaxException, StorageException {
throws URISyntaxException, StorageException, IOException {
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
throws URISyntaxException, StorageException, IOException {
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public InputStream getInputStream(String account, String container, String blob)
}

public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
throws URISyntaxException, StorageException {
throws URISyntaxException, StorageException, IOException {
// NOTE: this should be here: if (prefix == null) prefix = "";
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
Expand All @@ -233,8 +233,9 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai
}

public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
boolean failIfAlreadyExists)
throws URISyntaxException, StorageException, FileAlreadyExistsException {
boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
assert inputStream.markSupported()
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.repositories.azure;

import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.SpecialPermission;

import java.io.IOException;
Expand All @@ -44,7 +45,9 @@ public static <T> T doPrivilegedIOException(PrivilegedExceptionAction<T> operati
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
Throwables.rethrow(e.getCause());
assert false : "always throws";
return null;
}
}

Expand All @@ -53,7 +56,9 @@ public static <T> T doPrivilegedException(PrivilegedExceptionAction<T> operation
try {
return AccessController.doPrivileged(operation);
} catch (PrivilegedActionException e) {
throw (StorageException) e.getCause();
Throwables.rethrow(e.getCause());
assert false : "always throws";
return null;
}
}

Expand All @@ -65,12 +70,7 @@ public static void doPrivilegedVoidException(StorageRunnable action) throws Stor
return null;
});
} catch (PrivilegedActionException e) {
Throwable cause = e.getCause();
if (cause instanceof StorageException) {
throw (StorageException) cause;
} else {
throw (URISyntaxException) cause;
}
Throwables.rethrow(e.getCause());
}
}

Expand Down

0 comments on commit bca0c8d

Please sign in to comment.