Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -34,13 +32,10 @@
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.BackgroundIndexer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand All @@ -58,10 +53,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

Expand All @@ -73,6 +64,7 @@ protected String repositoryType() {
@Override
protected Settings repositorySettings() {
return Settings.builder()
.put(super.repositorySettings())
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
.build();
Expand Down Expand Up @@ -108,41 +100,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

/**
* Test the snapshot and restore of an index which has large segments files.
*/
public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
.build());

// the number of documents here dictates the size of the single segment
// we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls
// the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb)
final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}

/**
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@
package org.elasticsearch.repositories.gcs;

import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.http.HttpStatus;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
Expand All @@ -49,7 +46,6 @@
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.threadpool.ThreadPool;
import org.threeten.bp.Duration;

Expand Down Expand Up @@ -81,9 +77,6 @@
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
Expand Down Expand Up @@ -177,44 +170,6 @@ public void testChunkSize() {
assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
}

/**
* Test the snapshot and restore of an index which has large segments files (2Mb+).
*
* The value of 2Mb is chosen according to the default chunk size configured in Google SDK client
* (see {@link BaseWriteChannel} chunk size).
*/
public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository("repository", Settings.builder()
.put(BUCKET.getKey(), "bucket")
.put(CLIENT_NAME.getKey(), "test")
.build());

final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final int nbDocs = 10_000;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), nbDocs)) {
waitForDocs(nbDocs, indexer);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}

public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {

public TestGoogleCloudStoragePlugin(Settings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
Expand All @@ -45,7 +43,6 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.ByteArrayOutputStream;
Expand All @@ -63,9 +60,6 @@
import java.util.concurrent.ConcurrentMap;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
Expand All @@ -79,6 +73,7 @@ protected String repositoryType() {
@Override
protected Settings repositorySettings() {
return Settings.builder()
.put(super.repositorySettings())
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.CLIENT_NAME.getKey(), "test")
.build();
Expand Down Expand Up @@ -116,34 +111,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;

Expand All @@ -44,10 +45,14 @@ protected String repositoryType() {

@Override
protected Settings repositorySettings() {
return Settings.builder()
.put(super.repositorySettings())
.put("location", randomRepoPath())
.build();
final Settings.Builder settings = Settings.builder();
settings.put(super.repositorySettings());
settings.put("location", randomRepoPath());
if (randomBoolean()) {
long size = 1 << randomInt(10);
settings.put("chunk_size", new ByteSizeValue(size, ByteSizeUnit.KB));
}
return settings.build();
}

public void testMissingDirectoriesNotCreatedInReadonlyRepository() throws IOException, ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
Expand Down Expand Up @@ -63,13 +61,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
protected abstract String repositoryType();

protected Settings repositorySettings() {
final Settings.Builder settings = Settings.builder();
settings.put("compress", randomBoolean());
if (randomBoolean()) {
long size = 1 << randomInt(10);
settings.put("chunk_size", new ByteSizeValue(size, ByteSizeUnit.KB));
}
return settings.build();
return Settings.builder().put("compress", randomBoolean()).build();
}

protected final String createRepository(final String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpStatus;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.BackgroundIndexer;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -39,6 +43,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;

/**
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
*/
Expand Down Expand Up @@ -83,6 +91,37 @@ public void tearDownHttpServer() {

protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate);

/**
* Test the snapshot and restore of an index which has large segments files.
*/
public final void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}

flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));

assertAcked(client().admin().indices().prepareDelete(index));

assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}

protected static String httpServerUrl() {
InetSocketAddress address = httpServer.getAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
Expand Down