Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Segments] Retry segment uploads to remote store on failure #7400

Merged
merged 10 commits into from
May 9, 2023
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractSnapshotIntegTestCase {

private static final String REPOSITORY_NAME = "my-segment-repo-1";
private static final String INDEX_NAME = "remote-store-test-idx-1";

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.put(FeatureFlags.REMOTE_STORE, "true")
.build();
}

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);
internalCluster().startClusterManagerOnlyNode();
}

@Override
public Settings indexSettings() {
return remoteStoreIndexSettings(0);
}

private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

public void testRemoteRefreshRetryOnFailure() throws Exception {

// Create repository
Path location = randomRepoPath().toAbsolutePath();
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", location)
.put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
);

internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexData(randomIntBetween(5, 10), randomBoolean());
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

assertBusy(
() -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath))
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
);
}

/**
* Gets all segment files which starts with "_". For instance, _0.cfe, _o.cfs etc.
*
* @param location the path to location where segment files are being searched.
* @return set of file names of all segment file or empty set if there was IOException thrown.
*/
private Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().startsWith("_"))
.map(path -> path.getFileName().toString())
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
.setSource(randomAlphaOfLength(5), randomAlphaOfLength(5))
.get();
}

private void indexData(int numberOfIterations, boolean invokeFlush) {
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
if (invokeFlush) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.index.fielddata.IndexFieldDataService;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -204,6 +205,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.SEARCHABLE_SNAPSHOT_ID_NAME,
IndexSettings.SEARCHABLE_SNAPSHOT_ID_UUID,

// Settings for Remote Store
RemoteStoreRefreshListener.INDEX_REMOTE_REFRESH_RETRY_INTERVAL,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
Loading