Skip to content

Commit add7148

Browse files
committed
GCS deleteBlobsIgnoringIfNotExists should catch StorageException (#46832)
GoogleCloudStorageBlobStore.deleteBlobsIgnoringIfNotExists() does not correctly catch StorageException thrown by batch.submit(). In the case a snapshot is deleted through BlobStoreRepository.deleteSnapshot() a storage exception is not caught (only IOException are) so the deletion is interrupted and indices cannot be cleaned up. The storage exception bubbles up to SnapshotService.deleteSnapshotFromRepository() but the listener that removes the deletion from the cluster state is not executed, leaving the deletion in the cluster state. This bug has been reported in #46772 where batch.submit() threw an exception in the test testIndicesDeletedFromRepository and following tests failed because a snapshot deletion was running. Relates #46772
1 parent 74d1588 commit add7148

File tree

2 files changed

+88
-22
lines changed

2 files changed

+88
-22
lines changed

plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -364,31 +364,36 @@ void deleteBlobsIgnoringIfNotExists(Collection<String> blobNames) throws IOExcep
364364
}
365365
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
366366
final List<BlobId> failedBlobs = Collections.synchronizedList(new ArrayList<>());
367-
final StorageException e = SocketAccess.doPrivilegedIOException(() -> {
368-
final AtomicReference<StorageException> ioe = new AtomicReference<>();
369-
final StorageBatch batch = client().batch();
370-
for (BlobId blob : blobIdsToDelete) {
371-
batch.delete(blob).notify(
372-
new BatchResult.Callback<Boolean, StorageException>() {
373-
@Override
374-
public void success(Boolean result) {
375-
}
367+
try {
368+
SocketAccess.doPrivilegedVoidIOException(() -> {
369+
final AtomicReference<StorageException> ioe = new AtomicReference<>();
370+
final StorageBatch batch = client().batch();
371+
for (BlobId blob : blobIdsToDelete) {
372+
batch.delete(blob).notify(
373+
new BatchResult.Callback<Boolean, StorageException>() {
374+
@Override
375+
public void success(Boolean result) {
376+
}
376377

377-
@Override
378-
public void error(StorageException exception) {
379-
if (exception.getCode() != HTTP_NOT_FOUND) {
380-
failedBlobs.add(blob);
381-
if (ioe.compareAndSet(null, exception) == false) {
382-
ioe.get().addSuppressed(exception);
378+
@Override
379+
public void error(StorageException exception) {
380+
if (exception.getCode() != HTTP_NOT_FOUND) {
381+
failedBlobs.add(blob);
382+
if (ioe.compareAndSet(null, exception) == false) {
383+
ioe.get().addSuppressed(exception);
384+
}
383385
}
384386
}
385-
}
386-
});
387-
}
388-
batch.submit();
389-
return ioe.get();
390-
});
391-
if (e != null) {
387+
});
388+
}
389+
batch.submit();
390+
391+
final StorageException exception = ioe.get();
392+
if (exception != null) {
393+
throw exception;
394+
}
395+
});
396+
} catch (final Exception e) {
392397
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
393398
}
394399
assert failedBlobs.isEmpty();

plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@
1919

2020
package org.elasticsearch.repositories.gcs;
2121

22+
import com.google.cloud.BatchResult;
23+
import com.google.cloud.storage.BlobId;
24+
import com.google.cloud.storage.Bucket;
25+
import com.google.cloud.storage.Storage;
26+
import com.google.cloud.storage.StorageBatch;
27+
import com.google.cloud.storage.StorageBatchResult;
28+
import com.google.cloud.storage.StorageException;
2229
import org.apache.lucene.util.BytesRef;
2330
import org.apache.lucene.util.BytesRefBuilder;
2431
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -30,11 +37,18 @@
3037
import java.io.IOException;
3138
import java.io.InputStream;
3239
import java.util.Arrays;
40+
import java.util.List;
3341
import java.util.Locale;
3442
import java.util.concurrent.ConcurrentHashMap;
3543

3644
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
45+
import static org.hamcrest.Matchers.instanceOf;
3746
import static org.mockito.Matchers.any;
47+
import static org.mockito.Matchers.eq;
48+
import static org.mockito.Mockito.doAnswer;
49+
import static org.mockito.Mockito.doNothing;
50+
import static org.mockito.Mockito.doReturn;
51+
import static org.mockito.Mockito.doThrow;
3852
import static org.mockito.Mockito.mock;
3953
import static org.mockito.Mockito.when;
4054

@@ -77,4 +91,51 @@ public void testWriteReadLarge() throws IOException {
7791
}
7892
}
7993

94+
@SuppressWarnings("unchecked")
95+
public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Exception {
96+
final List<String> blobs = Arrays.asList("blobA", "blobB");
97+
98+
final StorageBatch batch = mock(StorageBatch.class);
99+
if (randomBoolean()) {
100+
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class);
101+
when(batch.delete(any(BlobId.class))).thenReturn(result);
102+
doThrow(new StorageException(new IOException("Batch submit throws a storage exception"))).when(batch).submit();
103+
} else {
104+
StorageBatchResult<Boolean> resultA = mock(StorageBatchResult.class);
105+
doReturn(resultA).when(batch).delete(eq(BlobId.of("bucket", "blobA")));
106+
doAnswer(invocation -> {
107+
StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception"));
108+
((BatchResult.Callback) invocation.getArguments()[0]).error(storageException);
109+
return null;
110+
}).when(resultA).notify(any(StorageBatchResult.Callback.class));
111+
112+
StorageBatchResult<Boolean> resultB = mock(StorageBatchResult.class);
113+
doReturn(resultB).when(batch).delete(eq(BlobId.of("bucket", "blobB")));
114+
doAnswer(invocation -> {
115+
if (randomBoolean()) {
116+
StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception"));
117+
((BatchResult.Callback) invocation.getArguments()[0]).error(storageException);
118+
} else {
119+
((BatchResult.Callback) invocation.getArguments()[0]).success(randomBoolean());
120+
}
121+
return null;
122+
}).when(resultB).notify(any(StorageBatchResult.Callback.class));
123+
124+
doNothing().when(batch).submit();
125+
}
126+
127+
final Storage storage = mock(Storage.class);
128+
when(storage.get("bucket")).thenReturn(mock(Bucket.class));
129+
when(storage.batch()).thenReturn(batch);
130+
131+
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
132+
when(storageService.client(any(String.class))).thenReturn(storage);
133+
134+
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", storageService)) {
135+
final BlobContainer container = store.blobContainer(new BlobPath());
136+
137+
IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs));
138+
assertThat(e.getCause(), instanceOf(StorageException.class));
139+
}
140+
}
80141
}

0 commit comments

Comments
 (0)