Skip to content

Commit

Permalink
Fix file cache initialization (#14004) (#14566)
Browse files Browse the repository at this point in the history
* fix file cache initialization



* changelog



* add test



---------


(cherry picked from commit 2be25bb)

Signed-off-by: panguixin <panguixin@bytedance.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 2caa7db commit 70064e2
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -65,10 +67,13 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -1009,6 +1014,26 @@ public void cleanup() throws Exception {
);
}

public void testStartSearchNode() throws Exception {
// test start dedicated search node
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
// test start node without search role
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
// test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
internalCluster().startNode(
Settings.builder()
.put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
.put(TIERED_REMOTE_INDEX, true)
);
// test start non-dedicated search node
assertThrows(
SettingsException.class,
() -> internalCluster().startNode(
Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
)
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

public static boolean isDedicatedSearchNode(Settings settings) {
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
private final Weigher<V> weigher;

public SegmentedCache(Builder<K, V> builder) {
this.capacity = builder.capacity;
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
this.segmentMask = segments - 1;
this.table = newSegmentArray(segments);
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
this.weigher = builder.weigher;
for (int i = 0; i < table.length; i++) {
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
}
this.capacity = perSegmentCapacity * segments;
}

@SuppressWarnings("unchecked")
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ public FsInfo stats(FsInfo previous) throws IOException {
if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) {
paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes());
paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage());
paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized);
// fileCacheFree will be less than zero if the cache being over-subscribed
long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized;
if (fileCacheFree > 0) {
paths[i].available -= fileCacheFree;
}
// occurs if reserved file cache space is occupied by other files, like local indices
if (paths[i].available < 0) {
paths[i].available = 0;
Expand Down Expand Up @@ -215,4 +219,11 @@ public static FsInfo.Path getFSInfo(NodePath nodePath) throws IOException {
return fsPath;
}

public static long getTotalSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace());
}

public static long getAvailableSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace());
}
}
86 changes: 53 additions & 33 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Build;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.unit.RatioValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -175,7 +177,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -371,9 +372,12 @@ public class Node implements Closeable {
}
}, Setting.Property.NodeScope);

public static final Setting<ByteSizeValue> NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting(
private static final String ZERO = "0";

public static final Setting<String> NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>(
"node.search.cache.size",
ByteSizeValue.ZERO,
s -> (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) || DiscoveryNode.isDedicatedSearchNode(s)) ? "80%" : ZERO,
Node::validateFileCacheSize,
Property.NodeScope
);

Expand Down Expand Up @@ -1997,43 +2001,59 @@ DiscoveryNode getNode() {
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
* Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING);
if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) {
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (!isWritableRemoteIndexEnabled
&& DiscoveryNode.getRolesFromSettings(settings)
.stream()
.anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
if (DiscoveryNode.isSearchNode(settings) == false && isWritableRemoteIndexEnabled == false) {
return;
}

String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings);
logger.info("cache size [{}]", capacityRaw);
if (capacityRaw.equals(ZERO)) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
}

NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath));
long capacity = calculateFileCacheSize(capacityRaw, totalSpace);
if (capacity <= 0 || totalSpace <= capacity) {
throw new SettingsException("Cache size must be larger than zero and less than total capacity");
}

this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}

private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {
try {
RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw);
return Math.round(totalSpace * ratioValue.getAsRatio());
} catch (OpenSearchParseException e) {
try {
return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes();
} catch (OpenSearchParseException ex) {
ex.addSuppressed(e);
throw ex;
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

private static String validateFileCacheSize(String capacityRaw) {
calculateFileCacheSize(capacityRaw, 0L);
return capacityRaw;
}

/**
* Returns the {@link FileCache} instance for remote search node
* Note: Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void createNodePaths() throws IOException {
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
Settings defaultSearchSettings = Settings.builder()
.put(dataClusterManagerSettings)
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB))
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString())
.build();

searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);
Expand Down
2 changes: 1 addition & 1 deletion server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception {
List<Class<? extends Plugin>> plugins = basePlugins();
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
Settings onlySearchRoleSettings = Settings.builder()
.put(searchRoleSettingsWithConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.NodeRoles.removeRoles;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.node.settings()
);

private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB);
private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb";
private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%";

public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1;
public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3;
Expand Down Expand Up @@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
logger.info("increasing cluster size from {} to {}", size, n);
Set<DiscoveryNodeRole> searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE);
Settings settings = Settings.builder()
.put(Settings.EMPTY)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE)
.put(
Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(),
randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES
)
.build();
startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
validateClusterFormed();
Expand Down

0 comments on commit 70064e2

Please sign in to comment.