Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into sid/search-query-category
Browse files Browse the repository at this point in the history
  • Loading branch information
deshsidd authored Oct 18, 2023
2 parents 9a4b066 + 5ec2fe9 commit 7c3ea41
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-2.11.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added
- Add coordinator level stats for search latency ([#8386](https://github.com/opensearch-project/OpenSearch/issues/8386))
- Add metrics for thread_pool task wait time ([#9681](https://github.com/opensearch-project/OpenSearch/pull/9681))
- Add parallel file download support for remote store based replication ([#8596](https://github.com/opensearch-project/OpenSearch/pull/8596))
- Async blob read support for S3 plugin ([#9694](https://github.com/opensearch-project/OpenSearch/pull/9694))
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected void restore(String... indices) {
);
}

protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName, boolean indexMoreData) throws Exception {
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
// This is to ensure that shards that were already assigned will get latest count
Expand All @@ -68,6 +68,8 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
30,
TimeUnit.SECONDS
);
if (indexMoreData == false) return;

IndexResponse response = indexSingleDoc(indexName);
if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) {
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
Expand All @@ -80,6 +82,10 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
);
}

protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
Expand All @@ -19,11 +25,19 @@

import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;

import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_SETTING;
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreClusterStateRestoreIT extends BaseRemoteStoreRestoreIT {
Expand Down Expand Up @@ -86,6 +100,7 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
Map<String, Long> indexStats2 = indexData(1, false, secondIndexName);
assertEquals((shardCount + 1) * (replicaCount + 1), getNumShards(secondIndexName).totalNumShards);
ensureGreen(secondIndexName);
updateIndexBlock(true, secondIndexName);

String prevClusterUUID = clusterService().state().metadata().clusterUUID();

Expand All @@ -98,6 +113,13 @@ public void testFullClusterRestoreMultipleIndices() throws Exception {
// Step - 3 Trigger full cluster restore
validateMetadata(List.of(INDEX_NAME, secondIndexName));
verifyRestoredData(indexStats, INDEX_NAME);
verifyRestoredData(indexStats2, secondIndexName, false);
assertTrue(INDEX_READ_ONLY_SETTING.get(clusterService().state().metadata().index(secondIndexName).getSettings()));
assertThrows(ClusterBlockException.class, () -> indexSingleDoc(secondIndexName));
// Test is complete

// Remove the block to ensure proper cleanup
updateIndexBlock(false, secondIndexName);
}

public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathThrowsException() throws Exception {
Expand Down Expand Up @@ -192,4 +214,119 @@ private void validateCurrentMetadata() throws Exception {
}
});
}

public void testFullClusterRestoreGlobalMetadata() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

// Step - 1 index some data to generate files in remote directory
Map<String, Long> indexStats = initialTestSetup(shardCount, replicaCount, dataNodeCount, 1);
String prevClusterUUID = clusterService().state().metadata().clusterUUID();

// Create global metadata - register a custom repo
// TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691
// registerCustomRepository();

// Create global metadata - persistent settings
updatePersistentSettings(Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 34).build());

// Create global metadata - index template
putIndexTemplate();

// Create global metadata - Put cluster block
addClusterLevelReadOnlyBlock();

// Step - 2 Replace all nodes in the cluster with new nodes. This ensures new cluster state doesn't have previous index metadata
resetCluster(dataNodeCount, clusterManagerNodeCount);

String newClusterUUID = clusterService().state().metadata().clusterUUID();
assert !Objects.equals(newClusterUUID, prevClusterUUID) : "cluster restart not successful. cluster uuid is same";

// Step - 3 Trigger full cluster restore and validate
// validateCurrentMetadata();
verifyRestoredData(indexStats, INDEX_NAME, false);

// validate global metadata restored
verifyRestoredRepositories();
verifyRestoredIndexTemplate();
assertEquals(Integer.valueOf(34), SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(clusterService().state().metadata().settings()));
assertEquals(true, SETTING_READ_ONLY_SETTING.get(clusterService().state().metadata().settings()));
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
// Test is complete

// Remote the cluster read only block to ensure proper cleanup
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), false).build());
assertFalse(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
}

private void registerCustomRepository() {
assertAcked(
client().admin()
.cluster()
.preparePutRepository("custom-repo")
.setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", false))
.get()
);
}

private void verifyRestoredRepositories() {
RepositoriesMetadata repositoriesMetadata = clusterService().state().metadata().custom(RepositoriesMetadata.TYPE);
assertEquals(2, repositoriesMetadata.repositories().size()); // includes remote store repo as well
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_NAME).settings()));
assertTrue(SYSTEM_REPOSITORY_SETTING.get(repositoriesMetadata.repository(REPOSITORY_2_NAME).settings()));
// TODO - uncomment after all customs is also uploaded for all repos - https://github.com/opensearch-project/OpenSearch/issues/10691
// assertEquals("fs", repositoriesMetadata.repository("custom-repo").type());
// assertEquals(Settings.builder().put("location", randomRepoPath()).put("compress", false).build(),
// repositoriesMetadata.repository("custom-repo").settings());
}

private void addClusterLevelReadOnlyBlock() throws InterruptedException, ExecutionException {
updatePersistentSettings(Settings.builder().put(SETTING_READ_ONLY_SETTING.getKey(), true).build());
assertTrue(clusterService().state().blocks().hasGlobalBlock(CLUSTER_READ_ONLY_BLOCK));
}

private void updatePersistentSettings(Settings settings) throws ExecutionException, InterruptedException {
ClusterUpdateSettingsRequest resetRequest = new ClusterUpdateSettingsRequest();
resetRequest.persistentSettings(settings);
assertAcked(client().admin().cluster().updateSettings(resetRequest).get());
}

private void verifyRestoredIndexTemplate() {
Map<String, IndexTemplateMetadata> indexTemplateMetadataMap = clusterService().state().metadata().templates();
assertEquals(1, indexTemplateMetadataMap.size());
assertEquals(Arrays.asList("pattern-1", "log-*"), indexTemplateMetadataMap.get("my-template").patterns());
assertEquals(
Settings.builder() // <1>
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
.build(),
indexTemplateMetadataMap.get("my-template").settings()
);
}

private static void putIndexTemplate() {
PutIndexTemplateRequest request = new PutIndexTemplateRequest("my-template"); // <1>
request.patterns(Arrays.asList("pattern-1", "log-*")); // <2>

request.settings(
Settings.builder() // <1>
.put("index.number_of_shards", 3)
.put("index.number_of_replicas", 1)
);
assertTrue(client().admin().indices().putTemplate(request).actionGet().isAcknowledged());
}

private static void updateIndexBlock(boolean value, String secondIndexName) throws InterruptedException, ExecutionException {
assertAcked(
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(Settings.builder().put(INDEX_READ_ONLY_SETTING.getKey(), value).build(), secondIndexName)
)
.get()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public NodeStats(StreamInput in) throws IOException {
} else {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
resourceUsageStats = in.readOptionalWriteable(NodesResourceUsageStats::new);
} else {
resourceUsageStats = null;
Expand Down Expand Up @@ -462,7 +462,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.12 when we backport
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(resourceUsageStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
Expand Down
58 changes: 32 additions & 26 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,38 +158,44 @@ public void start(
PersistedState remotePersistedState = null;
boolean success = false;
try {
ClusterState clusterState = prepareInitialClusterState(
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.version(lastAcceptedVersion)
.metadata(metadata)
.build();

if (DiscoveryNode.isClusterManagerNode(settings) && isRemoteStoreClusterStateEnabled(settings)) {
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
String lastKnownClusterUUID = ClusterState.UNKNOWN_UUID;
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote(
clusterState.getClusterName().value()
);
if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
lastKnownClusterUUID,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID);
}

// Recovers Cluster and Index level blocks
clusterState = prepareInitialClusterState(
transportService,
clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.version(lastAcceptedVersion)
.metadata(upgradeMetadataForNode(metadata, metadataIndexUpgradeService, metadataUpgrader))
ClusterState.builder(clusterState)
.metadata(upgradeMetadataForNode(clusterState.metadata(), metadataIndexUpgradeService, metadataUpgrader))
.build()
);

if (DiscoveryNode.isClusterManagerNode(settings)) {
if (isRemoteStoreClusterStateEnabled(settings)) {
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
String lastKnownClusterUUID = ClusterState.UNKNOWN_UUID;
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote(
clusterState.getClusterName().value()
);
if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
lastKnownClusterUUID,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID);
}
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public Metadata getLatestMetadata(String clusterName, String clusterUUID) throws IOException {
public Metadata getLatestMetadata(String clusterName, String clusterUUID) {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
Expand Down Expand Up @@ -759,10 +759,7 @@ private Metadata getGlobalMetadata(String clusterName, String clusterUUID, Clust
*/
public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
Optional<String> latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
if (latestManifestFileName.isPresent()) {
return Optional.of(fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName.get()));
}
return Optional.empty();
return latestManifestFileName.map(s -> fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, s));
}

/**
Expand Down
Loading

0 comments on commit 7c3ea41

Please sign in to comment.