From 2bb4f3837113952488a9c71773107b024b756d14 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 3 Jul 2018 09:13:50 +0200 Subject: [PATCH] Add write*Blob option to replace existing blob (#31729) Adds a new parameter to the BlobContainer#write*Blob methods to specify whether the existing file should be overridden or not. For some metadata files in the repository, we actually want to replace the current file. This is currently implemented through an explicit blob delete and then a fresh write. In case of using a cloud provider (S3, GCS, Azure), this results in 2 API requests instead of just 1. This change will therefore allow us to achieve the same functionality using less API requests. --- .../blobstore/url/URLBlobContainer.java | 2 +- .../azure/AzureStorageFixture.java | 11 ++++-- .../azure/AzureBlobContainer.java | 4 +-- .../repositories/azure/AzureBlobStore.java | 6 ++-- .../azure/AzureStorageService.java | 9 +++-- .../azure/AzureStorageServiceMock.java | 5 +-- .../gcs/GoogleCloudStorageFixture.java | 19 +++++----- .../gcs/GoogleCloudStorageBlobContainer.java | 4 +-- .../gcs/GoogleCloudStorageBlobStore.java | 35 +++++++++++-------- .../repositories/hdfs/HdfsBlobContainer.java | 5 +-- .../hdfs/HdfsBlobStoreContainerTests.java | 2 +- .../repositories/s3/S3BlobContainer.java | 5 ++- .../common/blobstore/BlobContainer.java | 17 +++++---- .../common/blobstore/fs/FsBlobContainer.java | 19 +++++++--- .../blobstore/BlobStoreRepository.java | 18 +++++----- .../blobstore/ChecksumBlobStoreFormat.java | 4 +-- .../snapshots/BlobStoreFormatIT.java | 6 ++-- .../mockstore/BlobContainerWrapper.java | 9 ++--- .../snapshots/mockstore/MockRepository.java | 15 ++++---- .../ESBlobStoreContainerTestCase.java | 22 +++++++----- .../repositories/ESBlobStoreTestCase.java | 2 +- 21 files changed, 131 insertions(+), 88 deletions(-) diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index fb20b73b61c00..7b72871f4f78d 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -108,7 +108,7 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java index f906b9fa9a913..0bd9503f43dac 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java @@ -122,15 +122,20 @@ private static PathTrie defaultHandlers(final Map { final String destContainerName = request.getParam("container"); final String destBlobName = objectName(request.getParameters()); + final String ifNoneMatch = request.getHeader("If-None-Match"); final Container destContainer = containers.get(destContainerName); if (destContainer == null) { return newContainerNotFoundError(request.getId()); } - byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody()); - if (existingBytes != null) { - return newBlobAlreadyExistsError(request.getId()); + if ("*".equals(ifNoneMatch)) { + byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody()); + if (existingBytes != null) { + return newBlobAlreadyExistsError(request.getId()); + } + } else { + destContainer.objects.put(destBlobName, request.getBody()); } return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); }) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 751e00f06adbb..5d5330e8cb563 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -86,11 +86,11 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); try { - blobStore.writeBlob(buildKey(blobName), inputStream, blobSize); + blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { throw new IOException("Can not write blob " + blobName, e); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index bcd6b936af1aa..f4bc362e53602 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -117,8 +117,8 @@ public Map listBlobsByPrefix(String keyPath, String prefix return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException, - FileAlreadyExistsException { - service.writeBlob(this.clientName, container, blobName, inputStream, blobSize); + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws URISyntaxException, StorageException, FileAlreadyExistsException { + service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 73dd68f4b5f57..9482182b02d28 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -236,17 +236,20 @@ public Map listBlobsByPrefix(String account, String contai return blobsBuilder.immutableMap(); } - public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize) + public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName); try { + final AccessCondition accessCondition = + failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition(); SocketAccess.doPrivilegedVoidException(() -> - blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(), null, client.v2().get())); + blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get())); } catch (final StorageException se) { - if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && + if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { throw new FileAlreadyExistsException(blobName, null, se.getMessage()); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java index 264cb90378529..18eb529c0eebe 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceMock.java @@ -108,9 +108,10 @@ public Map listBlobsByPrefix(String account, String contai } @Override - public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize) + public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, FileAlreadyExistsException { - if (blobs.containsKey(blobName)) { + if (failIfAlreadyExists && blobs.containsKey(blobName)) { throw new FileAlreadyExistsException(blobName); } try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { diff --git a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java b/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java index b1a185c9c08c9..b37b89b243ba7 100644 --- a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java +++ b/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java @@ -158,10 +158,6 @@ private static PathTrie defaultHandlers(final Map { final String ifGenerationMatch = request.getParam("ifGenerationMatch"); - if ("0".equals(ifGenerationMatch) == false) { - return newError(RestStatus.PRECONDITION_FAILED, "object already exist"); - } - final String uploadType = request.getParam("uploadType"); if ("resumable".equals(uploadType)) { final String objectName = request.getParam("name"); @@ -172,12 +168,19 @@ private static PathTrie defaultHandlers(final Map LARGE_BLOB_THRESHOLD_BYTE_SIZE) { - writeBlobResumable(blobInfo, inputStream); + writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists); } else { - writeBlobMultipart(blobInfo, inputStream, blobSize); + writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists); } } @@ -210,14 +210,17 @@ void writeBlob(String blobName, InputStream inputStream, long blobSize) throws I * Uploads a blob using the "resumable upload" method (multiple requests, which * can be independently retried in case of failure, see * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - * * @param blobInfo the info for the blob to be uploaded * @param inputStream the stream containing the blob data + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream) throws IOException { + private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException { try { + final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ? + new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } : + new Storage.BlobWriteOption[0]; final WriteChannel writeChannel = SocketAccess - .doPrivilegedIOException(() -> client().writer(blobInfo, Storage.BlobWriteOption.doesNotExist())); + .doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions)); Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() { @Override public boolean isOpen() { @@ -236,7 +239,7 @@ public int write(ByteBuffer src) throws IOException { } })); } catch (final StorageException se) { - if (se.getCode() == HTTP_PRECON_FAILED) { + if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } throw se; @@ -248,20 +251,24 @@ public int write(ByteBuffer src) throws IOException { * 'multipart/related' request containing both data and metadata. The request is * gziped), see: * https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload - * - * @param blobInfo the info for the blob to be uploaded + * @param blobInfo the info for the blob to be uploaded * @param inputStream the stream containing the blob data * @param blobSize the size + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists */ - private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize) throws IOException { + private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize)); Streams.copy(inputStream, baos); try { + final Storage.BlobTargetOption[] targetOptions = failIfAlreadyExists ? + new Storage.BlobTargetOption[] { Storage.BlobTargetOption.doesNotExist() } : + new Storage.BlobTargetOption[0]; SocketAccess.doPrivilegedVoidIOException( - () -> client().create(blobInfo, baos.toByteArray(), Storage.BlobTargetOption.doesNotExist())); + () -> client().create(blobInfo, baos.toByteArray(), targetOptions)); } catch (final StorageException se) { - if (se.getCode() == HTTP_PRECON_FAILED) { + if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); } throw se; diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 23557ae6cf84a..580d033354e58 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -91,11 +91,12 @@ public InputStream readBlob(String blobName) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { store.execute((Operation) fileContext -> { Path blob = new Path(path, blobName); // we pass CREATE, which means it fails if a blob already exists. - EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); + EnumSet flags = failIfAlreadyExists ? EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK) : + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK); CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)}; try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { int bytesRead; diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java index a5d68331db78e..ba00862e93848 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreContainerTests.java @@ -135,7 +135,7 @@ public void testReadOnly() throws Exception { assertTrue(util.exists(hdfsPath)); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foo", new BytesArray(data)); + writeBlob(container, "foo", new BytesArray(data), randomBoolean()); assertArrayEquals(readBlobFully(container, "foo", data.length), data); assertTrue(container.blobExists("foo")); } diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 86b01a3e79cdd..b7cc2b89605d3 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -90,8 +90,11 @@ public InputStream readBlob(String blobName) throws IOException { } } + /** + * This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model. + */ @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { SocketAccess.doPrivilegedIOException(() -> { if (blobSize <= blobStore.bufferSizeInBytes()) { executeSingleUpload(blobStore, buildKey(blobName), inputStream, blobSize); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index db185f1e8c11c..2ecce44b55c1e 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -69,16 +69,18 @@ public interface BlobContainer { * @param blobSize * The size of the blob to be written, in bytes. It is implementation dependent whether * this value is used in writing the blob to the repository. - * @throws FileAlreadyExistsException if a blob by the same name already exists + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException; + void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; /** * Reads blob content from the input stream and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. When the BlobContainer implementation * does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then - * the {@link #writeBlob(String, InputStream, long)} method is used. + * the {@link #writeBlob(String, InputStream, long, boolean)} method is used. * * This method assumes the container does not already contain a blob of the same blobName. If a blob by the * same name already exists, the operation will fail and an {@link IOException} will be thrown. @@ -90,11 +92,14 @@ public interface BlobContainer { * @param blobSize * The size of the blob to be written, in bytes. It is implementation dependent whether * this value is used in writing the blob to the repository. - * @throws FileAlreadyExistsException if a blob by the same name already exists + * @param failIfAlreadyExists + * whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { - writeBlob(blobName, inputStream, blobSize); + default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + throws IOException { + writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } /** diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index a58802ecd1828..bab984bd85c74 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -124,7 +124,10 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + if (failIfAlreadyExists == false) { + deleteBlobIgnoringIfNotExists(blobName); + } final Path file = path.resolve(blobName); try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) { Streams.copy(inputStream, outputStream); @@ -134,7 +137,8 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) + throws IOException { final String tempBlob = tempBlobName(blobName); final Path tempBlobPath = path.resolve(tempBlob); try { @@ -142,7 +146,7 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream Streams.copy(inputStream, outputStream); } IOUtils.fsync(tempBlobPath, false); - moveBlobAtomic(tempBlob, blobName); + moveBlobAtomic(tempBlob, blobName, failIfAlreadyExists); } catch (IOException ex) { try { deleteBlobIgnoringIfNotExists(tempBlob); @@ -155,13 +159,18 @@ public void writeBlobAtomic(final String blobName, final InputStream inputStream } } - public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName) throws IOException { + public void moveBlobAtomic(final String sourceBlobName, final String targetBlobName, final boolean failIfAlreadyExists) + throws IOException { final Path sourceBlobPath = path.resolve(sourceBlobName); final Path targetBlobPath = path.resolve(targetBlobName); // If the target file exists then Files.move() behaviour is implementation specific // the existing file might be replaced or this method fails by throwing an IOException. if (Files.exists(targetBlobPath)) { - throw new FileAlreadyExistsException("blob [" + targetBlobPath + "] already exists, cannot overwrite"); + if (failIfAlreadyExists) { + throw new FileAlreadyExistsException("blob [" + targetBlobPath + "] already exists, cannot overwrite"); + } else { + deleteBlobIgnoringIfNotExists(targetBlobName); + } } Files.move(sourceBlobPath, targetBlobPath, StandardCopyOption.ATOMIC_MOVE); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0151e4e7322d5..86131fe468d28 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -556,7 +556,7 @@ public String startVerification() { String blobName = "master.dat"; BytesArray bytes = new BytesArray(testBytes); try (InputStream stream = bytes.streamInput()) { - testContainer.writeBlobAtomic(blobName, stream, bytes.length()); + testContainer.writeBlobAtomic(blobName, stream, bytes.length(), true); } return seed; } @@ -664,7 +664,7 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - writeAtomic(indexBlob, snapshotsBytes); + writeAtomic(indexBlob, snapshotsBytes, true); // delete the N-2 index file if it exists, keep the previous one around as a backup if (isReadOnly() == false && newGen - 2 >= 0) { final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); @@ -677,9 +677,8 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long rep bStream.writeLong(newGen); genBytes = bStream.bytes(); } - snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(INDEX_LATEST_BLOB); logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); - writeAtomic(INDEX_LATEST_BLOB, genBytes); + writeAtomic(INDEX_LATEST_BLOB, genBytes, false); } /** @@ -698,9 +697,8 @@ void writeIncompatibleSnapshots(RepositoryData repositoryData) throws IOExceptio } bytes = bStream.bytes(); } - snapshotsBlobContainer.deleteBlobIgnoringIfNotExists(INCOMPATIBLE_SNAPSHOTS_BLOB); // write the incompatible snapshots blob - writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes); + writeAtomic(INCOMPATIBLE_SNAPSHOTS_BLOB, bytes, false); } /** @@ -766,9 +764,9 @@ private long listBlobsToGetLatestIndexId() throws IOException { return latest; } - private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException { + private void writeAtomic(final String blobName, final BytesReference bytesRef, boolean failIfAlreadyExists) throws IOException { try (InputStream stream = bytesRef.streamInput()) { - snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length()); + snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length(), failIfAlreadyExists); } } @@ -813,7 +811,7 @@ public void verify(String seed, DiscoveryNode localNode) { try { BytesArray bytes = new BytesArray(seed); try (InputStream stream = bytes.streamInput()) { - testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length()); + testBlobContainer.writeBlob("data-" + localNode.getId() + ".dat", stream, bytes.length(), true); } } catch (IOException exp) { throw new RepositoryVerificationException(metadata.name(), "store location [" + blobStore() + "] is not accessible on the node [" + localNode + "]", exp); @@ -1252,7 +1250,7 @@ private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) t snapshotRateLimitingTimeInNanos::inc); } inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes); + blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); } Store.verify(indexInput); snapshotStatus.addProcessedFile(fileInfo.length()); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index b974be2b869ab..ca6ec74dc2ce2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -132,7 +132,7 @@ public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length()); + blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length(), true); } }); } @@ -150,7 +150,7 @@ public void write(T obj, BlobContainer blobContainer, String name) throws IOExce final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length()); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), true); } }); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index 70be72989cf95..6f4f69ad67e88 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -224,7 +224,8 @@ public void testAtomicWriteFailures() throws Exception { IOException writeBlobException = expectThrows(IOException.class, () -> { BlobContainer wrapper = new BlobContainerWrapper(blobContainer) { @Override - public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName); } }; @@ -251,10 +252,9 @@ protected void randomCorruption(BlobContainer blobContainer, String blobName) th int location = randomIntBetween(0, buffer.length - 1); buffer[location] = (byte) (buffer[location] ^ 42); } while (originalChecksum == checksum(buffer)); - blobContainer.deleteBlob(blobName); // delete original before writing new blob BytesArray bytesArray = new BytesArray(buffer); try (StreamInput stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length()); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), false); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index b5c6339724123..5666869a1aa0b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -49,13 +49,14 @@ public InputStream readBlob(String name) throws IOException { } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - delegate.writeBlob(blobName, inputStream, blobSize); + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { - delegate.writeBlobAtomic(blobName, inputStream, blobSize); + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + boolean failIfAlreadyExists) throws IOException { + delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d0702acf10373..d05a10905d858 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -346,9 +346,9 @@ public Map listBlobsByPrefix(String blobNamePrefix) throws } @Override - public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { + public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, inputStream, blobSize); + super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); if (RandomizedContext.current().getRandom().nextBoolean()) { // for network based repositories, the blob may have been written but we may still // get an error with the client connection, so an IOException here simulates this @@ -357,27 +357,28 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t } @Override - public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException { + public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, + final boolean failIfAlreadyExists) throws IOException { final Random random = RandomizedContext.current().getRandom(); if (allowAtomicOperations && random.nextBoolean()) { if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); - super.writeBlob(tempBlobName, inputStream, blobSize); + super.writeBlob(tempBlobName, inputStream, blobSize, failIfAlreadyExists); maybeIOExceptionOrBlock(blobName); final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate(); - fsBlobContainer.moveBlobAtomic(tempBlobName, blobName); + fsBlobContainer.moveBlobAtomic(tempBlobName, blobName, failIfAlreadyExists); } else { // Atomic write since it is potentially supported // by the delegating blob container maybeIOExceptionOrBlock(blobName); - super.writeBlobAtomic(blobName, inputStream, blobSize); + super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } } else { // Simulate a non-atomic write since many blob container // implementations does not support atomic write maybeIOExceptionOrBlock(blobName); - super.writeBlob(blobName, inputStream, blobSize); + super.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 43a62bbe662cc..9f12c36999145 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -61,7 +61,12 @@ public void testWriteRead() throws IOException { try(BlobStore store = newBlobStore()) { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); - writeBlob(container, "foobar", new BytesArray(data)); + writeBlob(container, "foobar", new BytesArray(data), randomBoolean()); + if (randomBoolean()) { + // override file, to check if we get latest contents + data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); + writeBlob(container, "foobar", new BytesArray(data), false); + } try (InputStream stream = container.readBlob("foobar")) { BytesRefBuilder target = new BytesRefBuilder(); while (target.length() < data.length) { @@ -123,7 +128,7 @@ public void testDeleteBlob() throws IOException { byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - writeBlob(container, blobName, bytesArray); + writeBlob(container, blobName, bytesArray, randomBoolean()); container.deleteBlob(blobName); // should not raise // blob deleted, so should raise again @@ -149,20 +154,21 @@ public void testVerifyOverwriteFails() throws IOException { final BlobContainer container = store.blobContainer(new BlobPath()); byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); final BytesArray bytesArray = new BytesArray(data); - writeBlob(container, blobName, bytesArray); + writeBlob(container, blobName, bytesArray, true); // should not be able to overwrite existing blob - expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray)); + expectThrows(FileAlreadyExistsException.class, () -> writeBlob(container, blobName, bytesArray, true)); container.deleteBlob(blobName); - writeBlob(container, blobName, bytesArray); // after deleting the previous blob, we should be able to write to it again + writeBlob(container, blobName, bytesArray, true); // after deleting the previous blob, we should be able to write to it again } } - protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException { + protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray, + boolean failIfAlreadyExists) throws IOException { try (InputStream stream = bytesArray.streamInput()) { if (randomBoolean()) { - container.writeBlob(blobName, stream, bytesArray.length()); + container.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); } else { - container.writeBlobAtomic(blobName, stream, bytesArray.length()); + container.writeBlobAtomic(blobName, stream, bytesArray.length(), failIfAlreadyExists); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java index 35a17c2a8dd83..ccc38ae362991 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreTestCase.java @@ -80,7 +80,7 @@ public static byte[] randomBytes(int length) { protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException { try (InputStream stream = bytesArray.streamInput()) { - container.writeBlob(blobName, stream, bytesArray.length()); + container.writeBlob(blobName, stream, bytesArray.length(), true); } }