Skip to content

Commit

Permalink
Fix NPE on restore searchable snapshot
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <panguixin@bytedance.com>
  • Loading branch information
bugmakerrrrrr committed May 31, 2024
1 parent 6c9603a commit 60580f6
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.node.Node;
import org.opensearch.repositories.fs.FsRepository;
import org.hamcrest.MatcherAssert;
import org.junit.After;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -62,6 +63,10 @@

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -939,6 +944,52 @@ public void testRelocateSearchableSnapshotIndex() throws Exception {
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

public void testCreateSearchableSnapshotWithSpecifiedRemoteDataRatio() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName1 = "test-idx-1";
final String restoredIndexName1 = indexName1 + "-copy";
final String indexName2 = "test-idx-2";
final String restoredIndexName2 = indexName2 + "-copy";
final int numReplicasIndex1 = 1;
final int numReplicasIndex2 = 1;

Settings clusterManagerNodeSettings = clusterManagerOnlyNode();
internalCluster().startNodes(2, clusterManagerNodeSettings);
Settings dateNodeSettings = dataNode();
internalCluster().startNodes(2, dateNodeSettings);
createIndexWithDocsAndEnsureGreen(numReplicasIndex1, 100, indexName1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex2, 100, indexName2);

final Client client = client();
assertAcked(
client.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5))
);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName1, indexName2);

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);

assertDocCount(restoredIndexName1, 100L);
assertDocCount(restoredIndexName2, 100L);
assertIndexDirectoryDoesNotExist(restoredIndexName1, restoredIndexName2);
}

@After
public void cleanup() throws Exception {
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey()))
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -59,8 +58,6 @@
import java.util.Set;
import java.util.stream.Stream;

import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;

/**
* Transport action for updating index settings
*
Expand Down Expand Up @@ -133,9 +130,7 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
for (Index index : requestIndices) {
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
);
&& state.getMetadata().getIndexSafe(index).isRemoteSnapshot();
}
}
// check if all settings in the request are in the allow list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexModule;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -399,7 +398,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (indexMetadata.isRemoteSnapshot()) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -683,6 +684,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
private final ActiveShardCount waitForActiveShards;
private final Map<String, RolloverInfo> rolloverInfos;
private final boolean isSystem;
private final boolean isRemoteSnapshot;

private IndexMetadata(
final Index index,
Expand Down Expand Up @@ -743,6 +745,8 @@ private IndexMetadata(
this.waitForActiveShards = waitForActiveShards;
this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos);
this.isSystem = isSystem;
this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);
;
assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards;
}

Expand Down Expand Up @@ -1204,6 +1208,10 @@ public boolean isSystem() {
return isSystem;
}

public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

public static Builder builder(String index) {
return new Builder(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;

Expand Down Expand Up @@ -242,9 +241,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
final Set<ShardIterator> set = new HashSet<>(shards.size());
for (IndexShardRoutingTable shard : shards) {
IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(
indexMetadataForShard.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
) && (preference == null || preference.isEmpty())) {
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -60,10 +58,6 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

Expand All @@ -68,7 +69,6 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -109,11 +109,13 @@ public class DiskThresholdDecider extends AllocationDecider {

private final DiskThresholdSettings diskThresholdSettings;
private final boolean enableForSingleDataNode;
private final FileCacheSettings fileCacheSettings;

public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) {
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
assert Version.CURRENT.major < 9 : "remove enable_for_single_data_node in 9";
this.enableForSingleDataNode = ENABLE_FOR_SINGLE_DATA_NODE.get(settings);
this.fileCacheSettings = new FileCacheSettings(settings, clusterSettings);
}

/**
Expand Down Expand Up @@ -179,6 +181,12 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
The following block enables allocation for remote shards within safeguard limits of the filecache.
*/
if (REMOTE_CAPABLE.equals(getNodePool(node)) && REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
final double dataToFileCacheSizeRatio = fileCacheSettings.getRemoteDataRatio();
// we don't need to check the ratio
if (dataToFileCacheSizeRatio <= 0.1f) {
return Decision.YES;
}

final List<ShardRouting> remoteShardsOnNode = StreamSupport.stream(node.spliterator(), false)
.filter(shard -> shard.primary() && REMOTE_CAPABLE.equals(getShardPool(shard, allocation)))
.collect(Collectors.toList());
Expand All @@ -199,7 +207,6 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
Expand All @@ -208,6 +215,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
);
}
return Decision.YES;
} else if (REMOTE_CAPABLE.equals(getShardPool(shardRouting, allocation))) {
return Decision.NO;
}

Map<String, DiskUsage> usages = clusterInfo.getNodeMostAvailableDiskUsages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.remote.RemoteStorePressureSettings;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
Expand Down Expand Up @@ -689,7 +689,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,
FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.common.breaker.CircuitBreaker;
import org.opensearch.core.common.breaker.CircuitBreakingException;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
Expand Down Expand Up @@ -52,21 +51,6 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
this.circuitBreaker = circuitBreaker;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Settings relate to file cache
*
* @opensearch.internal
*/
public class FileCacheSettings {
/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile double remoteDataRatio;

public FileCacheSettings(Settings settings, ClusterSettings clusterSettings) {
setRemoteDataRatio(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(settings));
clusterSettings.addSettingsUpdateConsumer(DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING, this::setRemoteDataRatio);
}

public void setRemoteDataRatio(double remoteDataRatio) {
this.remoteDataRatio = remoteDataRatio;
}

public double getRemoteDataRatio() {
return remoteDataRatio;
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.index.store.remote.filecache.FileCacheSettings;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
Expand Down Expand Up @@ -1159,7 +1160,8 @@ protected Node(
metadataIndexUpgradeService,
shardLimitValidator,
indicesService,
clusterInfoService::getClusterInfo
clusterInfoService::getClusterInfo,
new FileCacheSettings(settings, clusterService.getClusterSettings())::getRemoteDataRatio
);

RemoteStoreRestoreService remoteStoreRestoreService = new RemoteStoreRestoreService(
Expand Down
Loading

0 comments on commit 60580f6

Please sign in to comment.