Skip to content

Commit

Permalink
Add restore level safeguards to prevent file cache oversubscription
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Jul 11, 2023
1 parent a39f60f commit 3d0464c
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public String snapshotUuid() {
/**
* Sets the storage type for this request.
*/
RestoreSnapshotRequest storageType(StorageType storageType) {
public RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) {
return allShardsSatisfyingPredicate(indices, shardRouting -> true, true);
}

/**
* All the shards on the node which match the predicate
* @param predicate condition to match
* @return iterator over shards matching the predicate
*/
public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
String[] indices = indicesRouting.keySet().toArray(new String[0]);
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -199,8 +199,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
final int dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0 && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -49,8 +50,12 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;
public static final Setting<Integer> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.intSetting(
"cluster.filecache.remote_data_ratio",
0,
0,
Setting.Property.NodeScope
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,9 @@ protected Node(
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
shardLimitValidator,
indicesService,
clusterInfoService
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
Expand Down
77 changes: 68 additions & 9 deletions server/src/main/java/org/opensearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.opensearch.cluster.routing.RoutingChangesObserver;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
Expand All @@ -88,6 +91,9 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -120,6 +126,7 @@
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.set.Sets.newHashSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.snapshots.SnapshotUtils.filterIndices;

/**
Expand Down Expand Up @@ -178,6 +185,10 @@ public class RestoreService implements ClusterStateApplier {

private final ClusterSettings clusterSettings;

private final IndicesService indicesService;

private final ClusterInfoService clusterInfoService;

private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey;

private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
Expand All @@ -188,8 +199,9 @@ public RestoreService(
AllocationService allocationService,
MetadataCreateIndexService createIndexService,
MetadataIndexUpgradeService metadataIndexUpgradeService,
ClusterSettings clusterSettings,
ShardLimitValidator shardLimitValidator
ShardLimitValidator shardLimitValidator,
IndicesService indicesService,
ClusterInfoService clusterInfoService
) {
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
Expand All @@ -201,6 +213,8 @@ public RestoreService(
}
this.clusterSettings = clusterService.getClusterSettings();
this.shardLimitValidator = shardLimitValidator;
this.indicesService = indicesService;
this.clusterInfoService = clusterInfoService;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true);
Expand Down Expand Up @@ -403,7 +417,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey(

@Override
public ClusterState execute(ClusterState currentState) {
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
Expand All @@ -425,6 +438,7 @@ public ClusterState execute(ClusterState currentState) {
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
final Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
Set<String> aliases = new HashSet<>();
long totalRestorableRemoteIndexesSize = 0;

if (indices.isEmpty() == false) {
// We have some indices to restore
Expand All @@ -435,17 +449,14 @@ public ClusterState execute(ClusterState currentState) {
String index = indexEntry.getValue();
boolean partial = checkPartial(index);

IndexId snapshotIndexId = repositoryData.resolveIndexId(index);
IndexMetadata snapshotIndexMetadata = updateIndexSettings(
metadata.index(index),
request.indexSettings(),
request.ignoreIndexSettings()
);
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
snapshotIndexMetadata = addSnapshotToIndexSettings(
snapshotIndexMetadata,
snapshot,
repositoryData.resolveIndexId(index)
);
snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId);
}
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
Expand All @@ -465,7 +476,7 @@ public ClusterState execute(ClusterState currentState) {
restoreUUID,
snapshot,
snapshotInfo.version(),
repositoryData.resolveIndexId(index),
snapshotIndexId,
isSearchableSnapshot,
isRemoteStoreShallowCopy,
request.getSourceRemoteStoreRepository()
Expand Down Expand Up @@ -584,6 +595,14 @@ public ClusterState execute(ClusterState currentState) {
}

for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
IndexShardSnapshotStatus.Copy shardStatus = repository.getShardSnapshotStatus(
snapshotInfo.snapshotId(),
snapshotIndexId,
new ShardId(metadata.index(index).getIndex(), shard)
).asCopy();
totalRestorableRemoteIndexesSize += shardStatus.getTotalSize();
}
if (!ignoreShards.contains(shard)) {
shardsBuilder.put(
new ShardId(renamedIndex, shard),
Expand Down Expand Up @@ -620,6 +639,9 @@ public ClusterState execute(ClusterState currentState) {
}

checkAliasNameConflicts(indices, aliases);
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
validateSearchableSnapshotRestorable(totalRestorableRemoteIndexesSize);
}

Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
updatedDataStreams.putAll(
Expand Down Expand Up @@ -819,6 +841,43 @@ private IndexMetadata updateIndexSettings(
return builder.settings(settingsBuilder).build();
}

private void validateSearchableSnapshotRestorable(long totalRestorableRemoteIndexesSize) {
ClusterInfo clusterInfo = clusterInfoService.getClusterInfo();
int remoteDataToFileCacheRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(clusterService.getSettings());
Map<String, FileCacheStats> nodeFileCacheStats = clusterInfo.getNodeFileCacheStats();
if (nodeFileCacheStats.isEmpty() || remoteDataToFileCacheRatio <= 0) {
return;
}

long totalNodeFileCacheSize = clusterInfo.getNodeFileCacheStats()
.values()
.stream()
.map(fileCacheStats -> fileCacheStats.getTotal().getBytes())
.mapToLong(Long::longValue)
.sum();

Predicate<ShardRouting> shardRoutingPredicate = shardRouting -> shardRouting.primary()
&& indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteSnapshot();

ShardsIterator shardsIterator = clusterService.state()
.routingTable()
.allShardsSatisfyingPredicate(shardRoutingPredicate);

long totalRestoredRemoteIndexesSize = shardsIterator.getShardRoutings()
.stream()
.map(clusterInfo::getShardSize)
.mapToLong(Long::longValue)
.sum();

if (totalRestoredRemoteIndexesSize + totalRestorableRemoteIndexesSize > remoteDataToFileCacheRatio
* totalNodeFileCacheSize) {
throw new SnapshotRestoreException(
snapshot,
"Size of the indexes to be restored exceed the file cache bounds. Increase the file cache capacity on the cluster."
);
}
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) {
instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10));
}

instance.storageType(
randomBoolean() ? RestoreSnapshotRequest.StorageType.LOCAL : RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT
);

if (randomBoolean()) {
instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,49 @@ public void testShardsMatchingPredicateCount() {
assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(2));
}

public void testAllShardsMatchingPredicate() {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build())
.build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2")))
.build();
clusterState = allocation.reroute(clusterState, "reroute");

Predicate<ShardRouting> predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed();
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(0));

// starting primaries
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// starting replicas
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
clusterState = allocation.disassociateDeadNodes(clusterState, true, "reroute");
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(predicate).size(), is(2));

// Verifies true against all shards on the node (active/inactive)
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> true).size(), is(4));
// Verifies false against all shards on the node (active/inactive)
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(shard -> false).size(), is(0));
// Verifies against all primary shards on the node
assertThat(clusterState.routingTable().allShardsSatisfyingPredicate(ShardRouting::primary).size(), is(2));
// Verifies a predicate which tests for inactive replicas
assertThat(
clusterState.routingTable()
.allShardsSatisfyingPredicate(shardRouting -> !shardRouting.primary() && !shardRouting.active())
.size(),
is(2)
);
}

public void testActivePrimaryShardsGrouped() {
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
Expand Down Expand Up @@ -406,6 +407,7 @@ public void testFileCacheRemoteShardsDecisions() {
DiskThresholdDecider diskThresholdDecider = makeDecider(diskSettings);
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test").settings(remoteIndexSettings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.persistentSettings(Settings.builder().put(FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.getKey(), 5).build())
.build();

RoutingTable initialRoutingTable = RoutingTable.builder().addAsNew(metadata.index("test")).build();
Expand Down
Loading

0 comments on commit 3d0464c

Please sign in to comment.