Skip to content

Commit

Permalink
[Snapshot V2] Use metadata from source snapshot while cloning snapsho…
Browse files Browse the repository at this point in the history
…t V2 (opensearch-project#16344)

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored and dk2k committed Oct 21, 2024
1 parent a7addd8 commit 4f76cbf
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -174,6 +175,118 @@ public void testCloneShallowCopyV2() throws Exception {
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyV2DeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-clone-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());
createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0));
assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards()));
assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

// Validate that the snapshot was created
final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance(
RepositoriesService.class
).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();

assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId()));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);
ensureGreen(indexName3);

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName2)).get());

AcknowledgedResponse response = client().admin()
.cluster()
.prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1")
.setIndices("*")
.get();
assertTrue(response.isAcknowledged());
awaitClusterManagerFinishRepoOperations();

AtomicReference<SnapshotId> cloneSnapshotId = new AtomicReference<>();
// Validate that snapshot is present in repository data
waitUntil(() -> {
PlainActionFuture<RepositoryData> repositoryDataPlainActionFutureClone = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFutureClone);

RepositoryData repositoryData1;
try {
repositoryData1 = repositoryDataPlainActionFutureClone.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) {
if (snapshotId.getName().equals("test_clone_snapshot1")) {
cloneSnapshotId.set(snapshotId);
return true;
}
}
return false;
}, 90, TimeUnit.SECONDS);

final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get();
SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get(
f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal)))
);

assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp()));
for (String index : sourceSnapshotInfo.indices()) {
assertTrue(cloneSnapshotInfo.indices().contains(index));

}
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyAfterDisablingV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
43 changes: 31 additions & 12 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,6 @@ public void cloneSnapshotV2(
) {

long startTime = System.currentTimeMillis();
ClusterState currentState = clusterService.state();
String snapshotName = snapshot.getSnapshotId().getName();
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;
Expand Down Expand Up @@ -963,8 +962,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId)));
final ShardGenerations shardGenerations = repositoryData.shardGenerations();

snapshotInfoListener.whenComplete(snapshotInfo -> {
final SnapshotInfo cloneSnapshotInfo = new SnapshotInfo(
snapshot.getSnapshotId(),
Expand All @@ -984,17 +981,28 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager");
}
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
final StepListener<Metadata> metadataListener = new StepListener<>();
pinnedTimestampListener.whenComplete(
rData -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
final Metadata.Builder metaBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(newEntry.source()));
for (IndexId index : newEntry.indices()) {
metaBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, newEntry.source(), index), false);
}
return metaBuilder.build();
})),
e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
}
);
metadataListener.whenComplete(meta -> {
ShardGenerations shardGenerations = buildGenerationsV2(newEntry, meta);
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(
currentState.metadata(),
newEntry.includeGlobalState(),
false,
newEntry.dataStreams(),
newEntry.indices()
),
metadataForSnapshot(meta, newEntry.includeGlobalState(), false, newEntry.dataStreams(), newEntry.indices()),
cloneSnapshotInfo,
repositoryData.getVersion(sourceSnapshotId),
state -> stateWithoutSnapshot(state, snapshot),
Expand Down Expand Up @@ -1038,7 +1046,7 @@ public void onFailure(Exception e) {
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
logger.error("Failed to retrieve metadata for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
Expand Down Expand Up @@ -1544,6 +1552,17 @@ private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snaps
return builder.build();
}

private static ShardGenerations buildGenerationsV2(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
ShardGenerations.Builder builder = ShardGenerations.builder();
snapshot.indices().forEach(indexId -> {
int shardCount = metadata.index(indexId.getName()).getNumberOfShards();
for (int i = 0; i < shardCount; i++) {
builder.put(indexId, i, null);
}
});
return builder.build();
}

private static Metadata metadataForSnapshot(
Metadata metadata,
boolean includeGlobalState,
Expand Down

0 comments on commit 4f76cbf

Please sign in to comment.