Skip to content

Commit

Permalink
Restore ClusterState version during remote state restore (#10853)
Browse files Browse the repository at this point in the history
* Restore ClusterState version during remote state restore

Signed-off-by: bansvaru <bansvaru@amazon.com>
  • Loading branch information
linuxpi authored Oct 29, 2023
1 parent f372cbf commit 73bbeb5
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- GHA to verify checklist items completion in PR descriptions ([#10800](https://github.com/opensearch-project/OpenSearch/pull/10800))
- [Remote cluster state] Restore cluster state version during remote state auto restore ([#10853](https://github.com/opensearch-project/OpenSearch/pull/10853))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public void testFullClusterRestoreStaleDelete() throws Exception {

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getIndices();
).getMetadata().getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -85,16 +86,25 @@ public void testFullClusterRestore() throws Exception {
// Step - 1 index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();

// Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata
resetCluster(dataNodeCount, clusterManagerNodeCount);

String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore and validate
// Step - 3 validate cluster state restored
long newClusterStateVersion = clusterService().state().version();
assert prevClusterStateVersion < newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is greater than current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);
validateMetadata(List.of(INDEX_NAME));
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);

}

/**
Expand All @@ -121,6 +131,7 @@ public void testFullClusterRestoreDoesntFailWithConflictingLocalState() throws E
// index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();

// stop all nodes
internalCluster().stopAllNodes();
Expand Down Expand Up @@ -156,6 +167,14 @@ public Settings onNodeStopped(String nodeName) {
newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, ClusterState.UNKNOWN_UUID) : "cluster restart not successful. cluster uuid is still unknown";
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

long newClusterStateVersion = clusterService().state().version();
assert prevClusterStateVersion < newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is greater than current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);
validateMetadata(List.of(INDEX_NAME));

// start data nodes to trigger index data recovery
Expand All @@ -180,14 +199,22 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
updateIndexBlock(true, secondIndexName);

String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();

// Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata
resetCluster(dataNodeCount, clusterManagerNodeCount);

String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore
// Step - 3 validate cluster state restored
long newClusterStateVersion = clusterService().state().version();
assert prevClusterStateVersion < newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is greater than current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);
validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, false);
verifyRedIndicesAndTriggerRestore(indexStats2, secondIndexName, false);
Expand Down Expand Up @@ -239,6 +266,7 @@ public void testRemoteStateFullRestart() throws Exception {

Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();
// Delete index metadata file in remote
try {
Files.move(
Expand All @@ -257,6 +285,14 @@ public void testRemoteStateFullRestart() throws Exception {
ensureGreen(INDEX_NAME);
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert Objects.equals(newClusterUUID, prevClusterUUID) : "Full restart not successful. cluster uuid has changed";

long newClusterStateVersion = clusterService().state().version();
assert prevClusterStateVersion < newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is greater than current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);
validateCurrentMetadata();
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}
Expand Down Expand Up @@ -309,6 +345,7 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {
// Step - 1 index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();
long prevClusterStateVersion = clusterService().state().version();

// Create global metadata - register a custom repo
Path repoPath = registerCustomRepository();
Expand All @@ -328,8 +365,16 @@ public void testFullClusterRestoreGlobalMetadata() throws Exception {
String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
// Step - 3 validate cluster state restored
long newClusterStateVersion = clusterService().state().version();
assert prevClusterStateVersion < newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is greater than current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);

validateCurrentMetadata();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,16 +767,16 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
}

/**
* Fetch latest metadata from remote cluster state including global metadata and index metadata
* Fetch latest ClusterState from remote, including global metadata, index metadata and cluster state version
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public Metadata getLatestMetadata(String clusterName, String clusterUUID) {
public ClusterState getLatestClusterState(String clusterName, String clusterUUID) {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
if (clusterMetadataManifest.isEmpty()) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID)
);
Expand All @@ -790,7 +790,10 @@ public Metadata getLatestMetadata(String clusterName, String clusterUUID) {
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); });

return Metadata.builder(globalMetadata).indices(indexMetadataMap).build();
return ClusterState.builder(ClusterState.EMPTY_STATE)
.version(clusterMetadataManifest.get().getStateVersion())
.metadata(Metadata.builder(globalMetadata).indices(indexMetadataMap).build())
.build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public RemoteRestoreResult restore(
String[] indexNames
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
Metadata remoteMetadata = null;
ClusterState remoteState = null;
boolean metadataFromRemoteStore = (restoreClusterUUID == null
|| restoreClusterUUID.isEmpty()
|| restoreClusterUUID.isBlank()) == false;
Expand All @@ -150,8 +150,8 @@ public RemoteRestoreResult restore(
throw new IllegalArgumentException("clusterUUID to restore from should be different from current cluster UUID");
}
logger.info("Restoring cluster state from remote store from cluster UUID : [{}]", restoreClusterUUID);
remoteMetadata = remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID);
remoteMetadata.getIndices().values().forEach(indexMetadata -> {
remoteState = remoteClusterStateService.getLatestClusterState(currentState.getClusterName().value(), restoreClusterUUID);
remoteState.getMetadata().getIndices().values().forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
});
} catch (Exception e) {
Expand All @@ -177,7 +177,7 @@ public RemoteRestoreResult restore(
}
}
}
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteState);
}

/**
Expand All @@ -191,7 +191,7 @@ private RemoteRestoreResult executeRestore(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards,
Metadata remoteMetadata
ClusterState remoteState
) {
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
Expand Down Expand Up @@ -241,8 +241,11 @@ private RemoteRestoreResult executeRestore(
totalShards += updatedIndexMetadata.getNumberOfShards();
}

if (remoteMetadata != null) {
restoreGlobalMetadata(mdBuilder, remoteMetadata);
if (remoteState != null) {
restoreGlobalMetadata(mdBuilder, remoteState.getMetadata());
// Restore ClusterState version
logger.info("Restoring ClusterState with Remote State version [{}]", remoteState.version());
builder.version(remoteState.version());
}

RestoreInfo restoreInfo = new RestoreInfo("remote_store", indicesToBeRestored, totalShards, totalShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE

remoteClusterStateService.start();
assertEquals(
remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
remoteClusterStateService.getLatestClusterState(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getMetadata()
.getIndices()
.size(),
0
Expand Down Expand Up @@ -694,8 +695,10 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio
remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getIndices()
() -> remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
).getMetadata().getIndices()
);
assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename());
}
Expand Down Expand Up @@ -740,10 +743,11 @@ public void testReadGlobalMetadata() throws IOException {
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();

long prevClusterStateVersion = 13L;
final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateVersion(prevClusterStateVersion)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
Expand All @@ -756,12 +760,20 @@ public void testReadGlobalMetadata() throws IOException {
Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();
mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata);

Metadata metadata = remoteClusterStateService.getLatestMetadata(
ClusterState newClusterState = remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

assertTrue(Metadata.isGlobalStateEquals(metadata, expactedMetadata));
assertTrue(Metadata.isGlobalStateEquals(newClusterState.getMetadata(), expactedMetadata));

long newClusterStateVersion = newClusterState.getVersion();
assert prevClusterStateVersion == newClusterStateVersion : String.format(
Locale.ROOT,
"ClusterState version is not restored. previousClusterVersion: [%s] is not equal to current [%s]",
prevClusterStateVersion,
newClusterStateVersion
);
}

public void testReadGlobalMetadataIOException() throws IOException {
Expand Down Expand Up @@ -793,7 +805,10 @@ public void testReadGlobalMetadataIOException() throws IOException {
remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
() -> remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
)
);
assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName);
}
Expand Down Expand Up @@ -824,16 +839,15 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.globalMetadataFileName("global-metadata-file")
.codecVersion(ClusterMetadataManifest.CODEC_V0)
.build();

mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata));

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
).getIndices();
).getMetadata().getIndices();

assertEquals(indexMetadataMap.size(), 1);
assertEquals(indexMetadataMap.get(index.getName()).getIndex().getName(), index.getName());
Expand Down

0 comments on commit 73bbeb5

Please sign in to comment.