Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write shard level metadata blob when snapshotting searchable snapshot indexes #13190

Merged
merged 3 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Switch to iterative version of WKT format parser ([#14086](https://github.com/opensearch-project/OpenSearch/pull/14086))
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
Expand Down Expand Up @@ -53,11 +54,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -132,21 +135,24 @@ public void testCreateSearchableSnapshot() throws Exception {

public void testSnapshottingSearchableSnapshots() throws Exception {
final String repoName = "test-repo";
final String initSnapName = "initial-snapshot";
final String indexName = "test-idx";
final String repeatSnapNamePrefix = "test-repeated-snap-";
final String repeatIndexNamePrefix = indexName + "-copy-";
final Client client = client();

// create an index, add data, snapshot it, then delete it
internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, "initial-snapshot", repoName, indexName);
takeSnapshot(client, initSnapName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// restore the index as a searchable snapshot
internalCluster().ensureAtLeastNumSearchNodes(1);
client.admin()
.cluster()
.prepareRestoreSnapshot(repoName, "initial-snapshot")
.prepareRestoreSnapshot(repoName, initSnapName)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-copy-0")
.setStorageType(RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT)
Expand All @@ -159,7 +165,7 @@ public void testSnapshottingSearchableSnapshots() throws Exception {

// Test that the searchable snapshot index can continue to be snapshotted and restored
for (int i = 0; i < 4; i++) {
final String repeatedSnapshotName = "test-repeated-snap-" + i;
final String repeatedSnapshotName = repeatSnapNamePrefix + i;
takeSnapshot(client, repeatedSnapshotName, repoName);
deleteIndicesAndEnsureGreen(client, "_all");
client.admin()
Expand All @@ -181,21 +187,34 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
final Map<String, List<String>> snapshotInfoMap = response.getSnapshots()
.stream()
.collect(Collectors.toMap(s -> s.snapshotId().getName(), SnapshotInfo::indices));
assertEquals(
Map.of(
"initial-snapshot",
List.of("test-idx"),
"test-repeated-snap-0",
List.of("test-idx-copy-0"),
"test-repeated-snap-1",
List.of("test-idx-copy-1"),
"test-repeated-snap-2",
List.of("test-idx-copy-2"),
"test-repeated-snap-3",
List.of("test-idx-copy-3")
),
snapshotInfoMap
);
final Map<String, List<String>> expect = new HashMap<>();
expect.put(initSnapName, List.of(indexName));
IntStream.range(0, 4).forEach(i -> expect.put(repeatSnapNamePrefix + i, List.of(repeatIndexNamePrefix + i)));
assertEquals(expect, snapshotInfoMap);

String[] snapNames = new String[5];
IntStream.range(0, 4).forEach(i -> snapNames[i] = repeatSnapNamePrefix + i);
snapNames[4] = initSnapName;
SnapshotsStatusResponse snapshotsStatusResponse = client.admin()
.cluster()
.prepareSnapshotStatus(repoName)
.addSnapshots(snapNames)
.execute()
.actionGet();
snapshotsStatusResponse.getSnapshots().forEach(s -> {
String snapName = s.getSnapshot().getSnapshotId().getName();
assertEquals(1, s.getIndices().size());
assertEquals(1, s.getShards().size());
if (snapName.equals("initial-snapshot")) {
assertNotNull(s.getIndices().get("test-idx"));
assertTrue(s.getShards().get(0).getStats().getTotalFileCount() > 0);
} else {
assertTrue(snapName.startsWith(repeatSnapNamePrefix));
assertEquals(1, s.getIndices().size());
assertNotNull(s.getIndices().get(repeatIndexNamePrefix + snapName.substring(repeatSnapNamePrefix.length())));
assertEquals(0L, s.getShards().get(0).getStats().getTotalFileCount());
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2801,9 +2801,12 @@ public void snapshotShard(
long indexIncrementalSize = 0;
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {
if (store.indexSettings().isRemoteSnapshot()) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot) then no need to snapshot the shard
indexCommitPointFiles = List.of();
} else if (filesFromSegmentInfos == null) {
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
indexCommitPointFiles = new ArrayList<>();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,53 +276,47 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map<ShardId, IndexS
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
if (isRemoteSnapshot(shardId)) {
// If the source of the data is another remote snapshot (i.e. searchable snapshot)
// then no need to snapshot the shard and can immediately notify success.
notifySuccessfulSnapshotShard(snapshot, shardId, snapshotStatus.generation());
} else {
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
snapshot(
shardId,
snapshot,
indexId,
entry.userMetadata(),
snapshotStatus,
entry.version(),
entry.remoteStoreIndexShallowCopy(),
new ActionListener<>() {
@Override
public void onResponse(String newGeneration) {
assert newGeneration != null;
assert newGeneration.equals(snapshotStatus.generation());
if (logger.isDebugEnabled()) {
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
logger.debug(
"snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot,
snapshot.getRepository(),
lastSnapshotStatus,
snapshotStatus.generation()
);
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}

@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
@Override
public void onFailure(Exception e) {
final String failure;
if (e instanceof AbortedSnapshotException) {
failure = "aborted";
logger.debug(() -> new ParameterizedMessage("[{}][{}] aborted shard snapshot", shardId, snapshot), e);
} else {
failure = summarizeFailure(e);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
}
);
}
}
);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@

import static org.opensearch.test.OpenSearchTestCase.buildNewFakeTransportAddress;
import static org.opensearch.test.OpenSearchTestCase.randomIntBetween;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasKey;
Expand Down Expand Up @@ -143,7 +142,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex
}
assertIndexUUIDs(repository, repositoryData);
assertSnapshotUUIDs(repository, repositoryData);
assertShardIndexGenerations(repository, blobContainer, repositoryData);
assertShardIndexGenerations(blobContainer, repositoryData);
return null;
} catch (AssertionError e) {
return e;
Expand All @@ -167,31 +166,24 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe
assertTrue(indexGenerations.length <= 2);
}

private static void assertShardIndexGenerations(BlobStoreRepository repository, BlobContainer repoRoot, RepositoryData repositoryData)
throws IOException {
private static void assertShardIndexGenerations(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final BlobContainer indicesContainer = repoRoot.children().get("indices");
for (IndexId index : shardGenerations.indices()) {
final List<String> gens = shardGenerations.getGens(index);
if (gens.isEmpty() == false) {
final BlobContainer indexContainer = indicesContainer.children().get(index.getId());
final Map<String, BlobContainer> shardContainers = indexContainer.children();
if (isRemoteSnapshot(repository, repositoryData, index)) {
// If the source of the data is another snapshot (i.e. searchable snapshot)
// then assert that there is no shard data (because it exists in the source snapshot)
assertThat(shardContainers, anEmptyMap());
} else {
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
for (int i = 0; i < gens.size(); i++) {
final String generation = gens.get(i);
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) {
final String shardId = Integer.toString(i);
assertThat(shardContainers, hasKey(shardId));
assertThat(
shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX),
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
);
}
}
}
Expand Down
Loading