Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file cache initialization #14004

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix fs info reporting negative available size ([#11573](https://github.com/opensearch-project/OpenSearch/pull/11573))
- Add ListPitInfo::getKeepAlive() getter ([#14495](https://github.com/opensearch-project/OpenSearch/pull/14495))
- Fix FuzzyQuery in keyword field will use IndexOrDocValuesQuery when both of index and doc_value are true ([#14378](https://github.com/opensearch-project/OpenSearch/pull/14378))
- Fix file cache initialization ([#14004](https://github.com/opensearch-project/OpenSearch/pull/14004))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
Expand Down Expand Up @@ -65,10 +67,13 @@
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.FS;
import static org.opensearch.common.util.FeatureFlags.TIERED_REMOTE_INDEX;
import static org.opensearch.core.common.util.CollectionUtils.iterableAsArrayList;
import static org.opensearch.index.store.remote.filecache.FileCacheSettings.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.test.NodeRoles.clusterManagerOnlyNode;
import static org.opensearch.test.NodeRoles.dataNode;
import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -1009,6 +1014,26 @@ public void cleanup() throws Exception {
);
}

public void testStartSearchNode() throws Exception {
// test start dedicated search node
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)));
// test start node without search role
internalCluster().startNode(Settings.builder().put(onlyRole(DiscoveryNodeRole.DATA_ROLE)));
// test start non-dedicated search node with TIERED_REMOTE_INDEX feature enabled
internalCluster().startNode(
Settings.builder()
.put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
.put(TIERED_REMOTE_INDEX, true)
);
// test start non-dedicated search node
assertThrows(
SettingsException.class,
() -> internalCluster().startNode(
Settings.builder().put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.DATA_ROLE)))
)
);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public static boolean isSearchNode(Settings settings) {
return hasRole(settings, DiscoveryNodeRole.SEARCH_ROLE);
}

public static boolean isDedicatedSearchNode(Settings settings) {
return getRolesFromSettings(settings).stream().allMatch(DiscoveryNodeRole.SEARCH_ROLE::equals);
}

private final String nodeName;
private final String nodeId;
private final String ephemeralId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ private static final int ceilingNextPowerOfTwo(int x) {
private final Weigher<V> weigher;

public SegmentedCache(Builder<K, V> builder) {
this.capacity = builder.capacity;
final int segments = ceilingNextPowerOfTwo(builder.concurrencyLevel);
this.segmentMask = segments - 1;
this.table = newSegmentArray(segments);
this.perSegmentCapacity = (capacity + (segments - 1)) / segments;
this.perSegmentCapacity = (builder.capacity + (segments - 1)) / segments;
this.weigher = builder.weigher;
for (int i = 0; i < table.length; i++) {
table[i] = new LRUCache<>(perSegmentCapacity, builder.listener, builder.weigher);
}
this.capacity = perSegmentCapacity * segments;
}

@SuppressWarnings("unchecked")
Expand Down
13 changes: 12 additions & 1 deletion server/src/main/java/org/opensearch/monitor/fs/FsProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@
if (fileCache != null && dataLocations[i].fileCacheReservedSize != ByteSizeValue.ZERO) {
paths[i].fileCacheReserved = adjustForHugeFilesystems(dataLocations[i].fileCacheReservedSize.getBytes());
paths[i].fileCacheUtilized = adjustForHugeFilesystems(fileCache.usage().usage());
paths[i].available -= (paths[i].fileCacheReserved - paths[i].fileCacheUtilized);
// fileCacheFree will be less than zero if the cache being over-subscribed
long fileCacheFree = paths[i].fileCacheReserved - paths[i].fileCacheUtilized;
if (fileCacheFree > 0) {
paths[i].available -= fileCacheFree;
}
// occurs if reserved file cache space is occupied by other files, like local indices
if (paths[i].available < 0) {
paths[i].available = 0;
Expand Down Expand Up @@ -215,4 +219,11 @@
return fsPath;
}

public static long getTotalSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getTotalSpace());
}

public static long getAvailableSize(NodePath nodePath) throws IOException {
return adjustForHugeFilesystems(nodePath.fileStore.getUsableSpace());

Check warning on line 227 in server/src/main/java/org/opensearch/monitor/fs/FsProbe.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/monitor/fs/FsProbe.java#L227

Added line #L227 was not covered by tests
}
}
86 changes: 53 additions & 33 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.Build;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version;
import org.opensearch.action.ActionModule;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.settings.SettingsModule;
import org.opensearch.common.unit.RatioValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
Expand Down Expand Up @@ -176,7 +178,6 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.monitor.fs.FsHealthService;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -372,9 +373,12 @@
}
}, Setting.Property.NodeScope);

public static final Setting<ByteSizeValue> NODE_SEARCH_CACHE_SIZE_SETTING = Setting.byteSizeSetting(
private static final String ZERO = "0";

public static final Setting<String> NODE_SEARCH_CACHE_SIZE_SETTING = new Setting<>(
"node.search.cache.size",
ByteSizeValue.ZERO,
s -> (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING) || DiscoveryNode.isDedicatedSearchNode(s)) ? "80%" : ZERO,
Node::validateFileCacheSize,
Property.NodeScope
);

Expand Down Expand Up @@ -2002,43 +2006,59 @@
* Initializes the search cache with a defined capacity.
* The capacity of the cache is based on user configuration for {@link Node#NODE_SEARCH_CACHE_SIZE_SETTING}.
* If the user doesn't configure the cache size, it fails if the node is a data + search node.
* Else it configures the size to 80% of available capacity for a dedicated search node, if not explicitly defined.
* Else it configures the size to 80% of total capacity for a dedicated search node, if not explicitly defined.
*/
private void initializeFileCache(Settings settings, CircuitBreaker circuitBreaker) throws IOException {
boolean isWritableRemoteIndexEnabled = FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING);
if (DiscoveryNode.isSearchNode(settings) || isWritableRemoteIndexEnabled) {
NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long capacity = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings).getBytes();
FsInfo.Path info = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getFSInfo(fileCacheNodePath));
long availableCapacity = info.getAvailable().getBytes();

// Initialize default values for cache if NODE_SEARCH_CACHE_SIZE_SETTING is not set.
if (capacity == 0) {
// If node is not a dedicated search node without configuration, prevent cache initialization
if (!isWritableRemoteIndexEnabled
&& DiscoveryNode.getRolesFromSettings(settings)
.stream()
.anyMatch(role -> !DiscoveryNodeRole.SEARCH_ROLE.equals(role))) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
} else {
capacity = 80 * availableCapacity / 100;
}
if (DiscoveryNode.isSearchNode(settings) == false && isWritableRemoteIndexEnabled == false) {
return;
}

String capacityRaw = NODE_SEARCH_CACHE_SIZE_SETTING.get(settings);
logger.info("cache size [{}]", capacityRaw);
if (capacityRaw.equals(ZERO)) {
throw new SettingsException(
"Unable to initialize the "
+ DiscoveryNodeRole.SEARCH_ROLE.roleName()
+ "-"
+ DiscoveryNodeRole.DATA_ROLE.roleName()
+ " node: Missing value for configuration "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
);
}

NodeEnvironment.NodePath fileCacheNodePath = nodeEnvironment.fileCacheNodePath();
long totalSpace = ExceptionsHelper.catchAsRuntimeException(() -> FsProbe.getTotalSize(fileCacheNodePath));
long capacity = calculateFileCacheSize(capacityRaw, totalSpace);
if (capacity <= 0 || totalSpace <= capacity) {
andrross marked this conversation as resolved.
Show resolved Hide resolved
throw new SettingsException("Cache size must be larger than zero and less than total capacity");

Check warning on line 2034 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L2034

Added line #L2034 was not covered by tests
}

this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}

private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {
try {
RatioValue ratioValue = RatioValue.parseRatioValue(capacityRaw);
return Math.round(totalSpace * ratioValue.getAsRatio());
} catch (OpenSearchParseException e) {
try {
return ByteSizeValue.parseBytesSizeValue(capacityRaw, NODE_SEARCH_CACHE_SIZE_SETTING.getKey()).getBytes();
} catch (OpenSearchParseException ex) {
ex.addSuppressed(e);
throw ex;

Check warning on line 2052 in server/src/main/java/org/opensearch/node/Node.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/node/Node.java#L2050-L2052

Added lines #L2050 - L2052 were not covered by tests
}
capacity = Math.min(capacity, availableCapacity);
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(capacity, ByteSizeUnit.BYTES);
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath);
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
}
}

private static String validateFileCacheSize(String capacityRaw) {
calculateFileCacheSize(capacityRaw, 0L);
return capacityRaw;
}

/**
* Returns the {@link FileCache} instance for remote search node
* Note: Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void createNodePaths() throws IOException {
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
Settings defaultSearchSettings = Settings.builder()
.put(dataClusterManagerSettings)
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB))
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(16, ByteSizeUnit.GB).toString())
.build();

searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);
Expand Down
2 changes: 1 addition & 1 deletion server/src/test/java/org/opensearch/node/NodeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void testCreateWithFileCache() throws Exception {
List<Class<? extends Plugin>> plugins = basePlugins();
ByteSizeValue cacheSize = new ByteSizeValue(16, ByteSizeUnit.GB);
Settings searchRoleSettingsWithConfig = baseSettings().put(searchRoleSettings)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), cacheSize.toString())
.build();
Settings onlySearchRoleSettings = Settings.builder()
.put(searchRoleSettingsWithConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static org.opensearch.test.NodeRoles.onlyRoles;
import static org.opensearch.test.NodeRoles.removeRoles;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
import static org.opensearch.test.OpenSearchTestCase.randomFrom;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -216,7 +217,8 @@ public final class InternalTestCluster extends TestCluster {
nodeAndClient.node.settings()
);

private static final ByteSizeValue DEFAULT_SEARCH_CACHE_SIZE = new ByteSizeValue(2, ByteSizeUnit.GB);
private static final String DEFAULT_SEARCH_CACHE_SIZE_BYTES = "2gb";
private static final String DEFAULT_SEARCH_CACHE_SIZE_PERCENT = "5%";

public static final int DEFAULT_LOW_NUM_CLUSTER_MANAGER_NODES = 1;
public static final int DEFAULT_HIGH_NUM_CLUSTER_MANAGER_NODES = 3;
Expand Down Expand Up @@ -700,8 +702,10 @@ public synchronized void ensureAtLeastNumSearchAndDataNodes(int n) {
logger.info("increasing cluster size from {} to {}", size, n);
Set<DiscoveryNodeRole> searchAndDataRoles = Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE);
Settings settings = Settings.builder()
.put(Settings.EMPTY)
.put(Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), DEFAULT_SEARCH_CACHE_SIZE)
.put(
Node.NODE_SEARCH_CACHE_SIZE_SETTING.getKey(),
randomBoolean() ? DEFAULT_SEARCH_CACHE_SIZE_PERCENT : DEFAULT_SEARCH_CACHE_SIZE_BYTES
)
.build();
startNodes(n - size, Settings.builder().put(onlyRoles(settings, searchAndDataRoles)).build());
validateClusterFormed();
Expand Down
Loading