Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Snapshot V2] Use metadata from source snapshot while cloning snapshot V2 #16352

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -174,6 +175,118 @@ public void testCloneShallowCopyV2() throws Exception {
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyV2DeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));
internalCluster().startDataOnlyNode(snapshotV2Settings(remoteStoreRepoPath));

String indexName1 = "testindex1";
String indexName2 = "testindex2";
String indexName3 = "testindex3";
String snapshotRepoName = "test-clone-snapshot-repo";
String snapshotName1 = "test-create-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

Client client = client();

assertAcked(
client.admin()
.cluster()
.preparePutRepository(snapshotRepoName)
.setType(FsRepository.TYPE)
.setSettings(
Settings.builder()
.put(FsRepository.LOCATION_SETTING.getKey(), absolutePath1)
.put(FsRepository.COMPRESS_SETTING.getKey(), randomBoolean())
.put(FsRepository.CHUNK_SIZE_SETTING.getKey(), randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), true)
.put(BlobStoreRepository.SHALLOW_SNAPSHOT_V2.getKey(), true)
)
);

createIndex(indexName1, getRemoteStoreBackedIndexSettings());
createIndex(indexName2, getRemoteStoreBackedIndexSettings());

final int numDocsInIndex1 = 10;
final int numDocsInIndex2 = 20;
indexRandomDocs(indexName1, numDocsInIndex1);
indexRandomDocs(indexName2, numDocsInIndex2);
ensureGreen(indexName1, indexName2);

CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(true)
.get();
SnapshotInfo sourceSnapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(sourceSnapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(sourceSnapshotInfo.successfulShards(), greaterThan(0));
assertThat(sourceSnapshotInfo.successfulShards(), equalTo(sourceSnapshotInfo.totalShards()));
assertThat(sourceSnapshotInfo.snapshotId().getName(), equalTo(snapshotName1));

// Validate that the snapshot was created
final BlobStoreRepository repository = (BlobStoreRepository) internalCluster().getCurrentClusterManagerNodeInstance(
RepositoriesService.class
).repository(snapshotRepoName);
PlainActionFuture<RepositoryData> repositoryDataPlainActionFuture = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFuture);

RepositoryData repositoryData = repositoryDataPlainActionFuture.get();

assertTrue(repositoryData.getSnapshotIds().contains(sourceSnapshotInfo.snapshotId()));

createIndex(indexName3, getRemoteStoreBackedIndexSettings());
indexRandomDocs(indexName3, 10);
ensureGreen(indexName3);

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName2)).get());

AcknowledgedResponse response = client().admin()
.cluster()
.prepareCloneSnapshot(snapshotRepoName, snapshotName1, "test_clone_snapshot1")
.setIndices("*")
.get();
assertTrue(response.isAcknowledged());
awaitClusterManagerFinishRepoOperations();

AtomicReference<SnapshotId> cloneSnapshotId = new AtomicReference<>();
// Validate that snapshot is present in repository data
waitUntil(() -> {
PlainActionFuture<RepositoryData> repositoryDataPlainActionFutureClone = new PlainActionFuture<>();
repository.getRepositoryData(repositoryDataPlainActionFutureClone);

RepositoryData repositoryData1;
try {
repositoryData1 = repositoryDataPlainActionFutureClone.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
for (SnapshotId snapshotId : repositoryData1.getSnapshotIds()) {
if (snapshotId.getName().equals("test_clone_snapshot1")) {
cloneSnapshotId.set(snapshotId);
return true;
}
}
return false;
}, 90, TimeUnit.SECONDS);

final SnapshotId cloneSnapshotIdFinal = cloneSnapshotId.get();
SnapshotInfo cloneSnapshotInfo = PlainActionFuture.get(
f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> repository.getSnapshotInfo(cloneSnapshotIdFinal)))
);

assertThat(cloneSnapshotInfo.getPinnedTimestamp(), equalTo(sourceSnapshotInfo.getPinnedTimestamp()));
for (String index : sourceSnapshotInfo.indices()) {
assertTrue(cloneSnapshotInfo.indices().contains(index));

}
assertThat(cloneSnapshotInfo.totalShards(), equalTo(sourceSnapshotInfo.totalShards()));
}

public void testCloneShallowCopyAfterDisablingV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,6 @@
) {

long startTime = System.currentTimeMillis();
ClusterState currentState = clusterService.state();
String snapshotName = snapshot.getSnapshotId().getName();
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) {
private SnapshotsInProgress.Entry newEntry;
Expand Down Expand Up @@ -1146,8 +1145,6 @@
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);

executor.execute(ActionRunnable.supply(snapshotInfoListener, () -> repository.getSnapshotInfo(sourceSnapshotId)));
final ShardGenerations shardGenerations = repositoryData.shardGenerations();

snapshotInfoListener.whenComplete(snapshotInfo -> {
final SnapshotInfo cloneSnapshotInfo = new SnapshotInfo(
snapshot.getSnapshotId(),
Expand All @@ -1167,17 +1164,28 @@
throw new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2 clone, no longer cluster manager");
}
final StepListener<RepositoryData> pinnedTimestampListener = new StepListener<>();
pinnedTimestampListener.whenComplete(repoData -> {
final StepListener<Metadata> metadataListener = new StepListener<>();
pinnedTimestampListener.whenComplete(
rData -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
final Metadata.Builder metaBuilder = Metadata.builder(repository.getSnapshotGlobalMetadata(newEntry.source()));

Check warning on line 1170 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1167-L1170

Added lines #L1167 - L1170 were not covered by tests
for (IndexId index : newEntry.indices()) {
metaBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, newEntry.source(), index), false);
}
return metaBuilder.build();

Check warning on line 1174 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1172-L1174

Added lines #L1172 - L1174 were not covered by tests
})),
e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
}

Check warning on line 1181 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1177-L1181

Added lines #L1177 - L1181 were not covered by tests
);
metadataListener.whenComplete(meta -> {
ShardGenerations shardGenerations = buildGenerationsV2(newEntry, meta);

Check warning on line 1184 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1183-L1184

Added lines #L1183 - L1184 were not covered by tests
repository.finalizeSnapshot(
shardGenerations,
repositoryData.getGenId(),
metadataForSnapshot(
currentState.metadata(),
newEntry.includeGlobalState(),
false,
newEntry.dataStreams(),
newEntry.indices()
),
metadataForSnapshot(meta, newEntry.includeGlobalState(), false, newEntry.dataStreams(), newEntry.indices()),

Check warning on line 1188 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1188

Added line #L1188 was not covered by tests
cloneSnapshotInfo,
repositoryData.getVersion(sourceSnapshotId),
state -> stateWithoutSnapshot(state, snapshot),
Expand Down Expand Up @@ -1221,7 +1229,7 @@
}
);
}, e -> {
logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} ", repositoryName, snapshotName);
logger.error("Failed to retrieve metadata for snapshot-v2 {} {} ", repositoryName, snapshotName);

Check warning on line 1232 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1232

Added line #L1232 was not covered by tests
stateWithoutSnapshotV2(newState);
leaveRepoLoop(repositoryName);
listener.onFailure(e);
Expand Down Expand Up @@ -1962,6 +1970,17 @@
return builder.build();
}

private static ShardGenerations buildGenerationsV2(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
ShardGenerations.Builder builder = ShardGenerations.builder();
snapshot.indices().forEach(indexId -> {
int shardCount = metadata.index(indexId.getName()).getNumberOfShards();

Check warning on line 1976 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1974-L1976

Added lines #L1974 - L1976 were not covered by tests
for (int i = 0; i < shardCount; i++) {
builder.put(indexId, i, null);

Check warning on line 1978 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1978

Added line #L1978 was not covered by tests
}
});
return builder.build();

Check warning on line 1981 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1980-L1981

Added lines #L1980 - L1981 were not covered by tests
}

private static Metadata metadataForSnapshot(
Metadata metadata,
boolean includeGlobalState,
Expand Down
Loading