Skip to content

Commit

Permalink
Add search node repurpose commands
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Mar 1, 2023
1 parent c3db971 commit bc3e16e
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 21 deletions.
105 changes: 98 additions & 7 deletions server/src/main/java/org/opensearch/env/NodeRepurposeCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.env.NodeEnvironment.CACHE_FOLDER;
import static org.opensearch.env.NodeEnvironment.INDICES_FOLDER;

/**
Expand All @@ -68,9 +69,11 @@ public class NodeRepurposeCommand extends OpenSearchNodeCommand {

static final String ABORTED_BY_USER_MSG = OpenSearchNodeCommand.ABORTED_BY_USER_MSG;
static final String FAILED_TO_OBTAIN_NODE_LOCK_MSG = OpenSearchNodeCommand.FAILED_TO_OBTAIN_NODE_LOCK_MSG;
static final String NO_CLEANUP = "Node has node.data=true -> no clean up necessary";
static final String NO_CLEANUP = "Node has node.data=true and node.search=true -> no clean up necessary";
static final String NO_DATA_TO_CLEAN_UP_FOUND = "No data to clean-up found";
static final String NO_SHARD_DATA_TO_CLEAN_UP_FOUND = "No shard data to clean-up found";
static final String NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND = "No file cache to clean-up found";
private static final int FILE_CACHE_NODE_PATH_LOCATION = 0;

public NodeRepurposeCommand() {
super("Repurpose this node to another cluster-manager/data role, cleaning up any excess persisted data");
Expand All @@ -83,7 +86,7 @@ void testExecute(Terminal terminal, OptionSet options, Environment env) throws E
@Override
protected boolean validateBeforeLock(Terminal terminal, Environment env) {
Settings settings = env.settings();
if (DiscoveryNode.isDataNode(settings)) {
if (DiscoveryNode.isDataNode(settings) && DiscoveryNode.isSearchNode(settings)) {
terminal.println(Terminal.Verbosity.NORMAL, NO_CLEANUP);
return false;
}
Expand All @@ -94,12 +97,20 @@ protected boolean validateBeforeLock(Terminal terminal, Environment env) {
@Override
protected void processNodePaths(Terminal terminal, Path[] dataPaths, int nodeLockId, OptionSet options, Environment env)
throws IOException {
assert DiscoveryNode.isDataNode(env.settings()) == false;
assert DiscoveryNode.isDataNode(env.settings()) == false || DiscoveryNode.isSearchNode(env.settings()) == false;

if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
processNoClusterManagerNoDataNode(terminal, dataPaths, env);
} else {
processClusterManagerNoDataNode(terminal, dataPaths, env);
if (DiscoveryNode.isDataNode(env.settings()) == false) {
if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
processNoClusterManagerNoDataNode(terminal, dataPaths, env);
} else {
processClusterManagerNoDataNode(terminal, dataPaths, env);
}
} else if (DiscoveryNode.isSearchNode(env.settings()) == false) {
if (DiscoveryNode.isClusterManagerNode(env.settings()) == false) {
processNoClusterManagerNoSearchNode(terminal, dataPaths, env);
} else {
processClusterManagerNoSearchNode(terminal, dataPaths, env);
}
}
}

Expand Down Expand Up @@ -175,6 +186,82 @@ private void processClusterManagerNoDataNode(Terminal terminal, Path[] dataPaths
terminal.println("Node successfully repurposed to cluster-manager and no-data.");
}

private void processNoClusterManagerNoSearchNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
List<Path> fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting index metadata paths");
NodeEnvironment.NodePath[] metadataNodePaths = { fileCacheNodePath };
List<Path> indexMetadataPaths = NodeEnvironment.collectIndexMetadataPaths(metadataNodePaths);

Set<Path> fileCachePaths = uniqueParentPaths(fileCacheDataPaths, indexMetadataPaths);

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);
final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

if (fileCacheDataPaths.isEmpty() && metadata.indices().isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

final Set<String> indexUUIDs = Sets.union(
indexUUIDsFor(fileCachePaths),
StreamSupport.stream(metadata.indices().values().spliterator(), false)
.map(imd -> imd.value.getIndexUUID())
.collect(Collectors.toSet())
);

outputVerboseInformation(terminal, fileCachePaths, indexUUIDs, metadata);

terminal.println(noClusterManagerMessage(indexUUIDs.size(), fileCacheDataPaths.size(), indexMetadataPaths.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println(
"Node is being re-purposed as no-cluster-manager and no-search. Clean-up of file cache and corresponding index metadata will be performed."
);
confirm(terminal, "Do you want to proceed?");

removePaths(terminal, fileCachePaths); // clean-up shard dirs
// clean-up all metadata dirs
MetadataStateFormat.deleteMetaState(dataPaths[FILE_CACHE_NODE_PATH_LOCATION]);
IOUtils.rm(dataPaths[FILE_CACHE_NODE_PATH_LOCATION].resolve(CACHE_FOLDER));

terminal.println("Node successfully repurposed to no-cluster-manager and no-search.");
}

private void processClusterManagerNoSearchNode(Terminal terminal, Path[] dataPaths, Environment env) throws IOException {
NodeEnvironment.NodePath fileCacheNodePath = toNodePaths(dataPaths)[FILE_CACHE_NODE_PATH_LOCATION];

terminal.println(Terminal.Verbosity.VERBOSE, "Collecting file cache data paths");
List<Path> fileCacheDataPaths = NodeEnvironment.collectFileCacheDataPath(fileCacheNodePath);

if (fileCacheDataPaths.isEmpty()) {
terminal.println(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND);
return;
}

final PersistedClusterStateService persistedClusterStateService = createPersistedClusterStateService(env.settings(), dataPaths);

final Metadata metadata = loadClusterState(terminal, env, persistedClusterStateService).metadata();

final Set<Path> fileCachePaths = uniqueParentPaths(fileCacheDataPaths);
final Set<String> indexUUIDs = indexUUIDsFor(fileCachePaths);

outputVerboseInformation(terminal, fileCacheDataPaths, indexUUIDs, metadata);

terminal.println(cacheMessage(indexUUIDs.size()));
outputHowToSeeVerboseInformation(terminal);

terminal.println("Node is being re-purposed as no-search. Clean-up of file cache data will be performed.");
confirm(terminal, "Do you want to proceed?");

removePaths(terminal, fileCacheDataPaths); // clean-up file cache dirs

terminal.println("Node successfully repurposed to no-search.");
}

private ClusterState loadClusterState(Terminal terminal, Environment env, PersistedClusterStateService psf) throws IOException {
terminal.println(Terminal.Verbosity.VERBOSE, "Loading cluster state");
return clusterState(env, psf.loadBestOnDiskState());
Expand Down Expand Up @@ -218,6 +305,10 @@ static String shardMessage(int shards, int indices) {
return "Found " + shards + " shards in " + indices + " indices to clean up";
}

static String cacheMessage(int indices) {
return "Found " + indices + " indices in file cache to clean up";
}

private void removePaths(Terminal terminal, Collection<Path> paths) {
terminal.println(Terminal.Verbosity.VERBOSE, "Removing data");
paths.forEach(this::removePath);
Expand Down
112 changes: 98 additions & 14 deletions server/src/test/java/org/opensearch/env/NodeRepurposeCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.index.Index;
Expand All @@ -57,16 +59,19 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;

import static org.opensearch.env.NodeRepurposeCommand.NO_CLEANUP;
import static org.opensearch.env.NodeRepurposeCommand.NO_DATA_TO_CLEAN_UP_FOUND;
import static org.opensearch.env.NodeRepurposeCommand.NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND;
import static org.opensearch.env.NodeRepurposeCommand.NO_SHARD_DATA_TO_CLEAN_UP_FOUND;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;
import static org.opensearch.test.NodeRoles.addRoles;
import static org.opensearch.test.NodeRoles.clusterManagerNode;
import static org.opensearch.test.NodeRoles.nonDataNode;
import static org.opensearch.test.NodeRoles.nonClusterManagerNode;
import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.NodeRoles.removeRoles;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -76,15 +81,35 @@ public class NodeRepurposeCommandTests extends OpenSearchTestCase {

private static final Index INDEX = new Index("testIndex", "testUUID");
private Settings dataClusterManagerSettings;
private Settings dataSearchClusterManagerSettings;
private Environment environment;
private Path[] nodePaths;
private Settings dataNoClusterManagerSettings;
private Settings dataSearchNoClusterManagerSettings;
private Settings noDataNoClusterManagerSettings;
private Settings noDataClusterManagerSettings;
private Settings searchNoDataNoClusterManagerSettings;
private Settings noSearchNoClusterManagerSettings;

@Before
public void createNodePaths() throws IOException {
dataClusterManagerSettings = buildEnvSettings(Settings.EMPTY);
Settings defaultSearchSettings = Settings.builder()
.put(dataClusterManagerSettings)
.put(NODE_SEARCH_CACHE_SIZE_SETTING.getKey(), new ByteSizeValue(100, ByteSizeUnit.MB))
.build();

searchNoDataNoClusterManagerSettings = onlyRole(dataClusterManagerSettings, DiscoveryNodeRole.SEARCH_ROLE);
dataSearchClusterManagerSettings = addRoles(defaultSearchSettings, Set.of(DiscoveryNodeRole.SEARCH_ROLE));
noDataClusterManagerSettings = clusterManagerNode(nonDataNode(dataClusterManagerSettings));

dataSearchNoClusterManagerSettings = nonClusterManagerNode(dataSearchClusterManagerSettings);
noSearchNoClusterManagerSettings = nonClusterManagerNode(defaultSearchSettings);

noDataNoClusterManagerSettings = removeRoles(
dataClusterManagerSettings,
Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)
);

environment = TestEnvironment.newEnvironment(dataClusterManagerSettings);
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(dataClusterManagerSettings, environment)) {
nodePaths = nodeEnvironment.nodeDataPaths();
Expand All @@ -102,20 +127,13 @@ public void createNodePaths() throws IOException {
writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE);
}
}
dataNoClusterManagerSettings = nonClusterManagerNode(dataClusterManagerSettings);
noDataNoClusterManagerSettings = removeRoles(
dataClusterManagerSettings,
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)))
);

noDataClusterManagerSettings = clusterManagerNode(nonDataNode(dataClusterManagerSettings));
}

public void testEarlyExitNoCleanup() throws Exception {
createIndexDataFiles(dataClusterManagerSettings, randomInt(10), randomBoolean());

verifyNoQuestions(dataClusterManagerSettings, containsString(NO_CLEANUP));
verifyNoQuestions(dataNoClusterManagerSettings, containsString(NO_CLEANUP));
verifyNoQuestions(dataSearchClusterManagerSettings, containsString(NO_CLEANUP));
verifyNoQuestions(dataSearchNoClusterManagerSettings, containsString(NO_CLEANUP));
}

public void testNothingToCleanup() throws Exception {
Expand All @@ -138,6 +156,7 @@ public void testNothingToCleanup() throws Exception {

verifyNoQuestions(noDataNoClusterManagerSettings, containsString(NO_DATA_TO_CLEAN_UP_FOUND));
verifyNoQuestions(noDataClusterManagerSettings, containsString(NO_SHARD_DATA_TO_CLEAN_UP_FOUND));
verifyNoQuestions(noSearchNoClusterManagerSettings, containsString(NO_FILE_CACHE_DATA_TO_CLEAN_UP_FOUND));

createIndexDataFiles(dataClusterManagerSettings, 0, randomBoolean());

Expand Down Expand Up @@ -208,6 +227,56 @@ public void testCleanupShardData() throws Exception {
new NodeEnvironment(noDataClusterManagerSettings, environment).close();
}

public void testCleanupSearchNode() throws Exception {
int shardCount = randomIntBetween(1, 10);
boolean verbose = randomBoolean();
boolean hasClusterState = randomBoolean();
createIndexDataFiles(searchNoDataNoClusterManagerSettings, shardCount, hasClusterState, true);

Matcher<String> matcher = allOf(
containsString(NodeRepurposeCommand.cacheMessage(1)),
conditionalNot(containsString("testUUID"), verbose == false),
conditionalNot(containsString("testIndex"), verbose == false || hasClusterState == false),
conditionalNot(containsString("no name for uuid: testUUID"), verbose == false || hasClusterState)
);

verifyUnchangedOnAbort(dataClusterManagerSettings, matcher, verbose);

// verify test setup
expectThrows(IllegalStateException.class, () -> new NodeEnvironment(dataClusterManagerSettings, environment).close());

verifySuccess(dataClusterManagerSettings, matcher, verbose);

// verify clean.
new NodeEnvironment(dataClusterManagerSettings, environment).close();
}

public void testCleanupSearchClusterManager() throws Exception {
int shardCount = randomIntBetween(1, 10);
boolean verbose = randomBoolean();
boolean hasClusterState = randomBoolean();
createIndexDataFiles(dataSearchClusterManagerSettings, shardCount, hasClusterState, true);

String messageText = NodeRepurposeCommand.noClusterManagerMessage(1, shardCount, 0);

Matcher<String> matcher = allOf(
containsString(messageText),
conditionalNot(containsString("testUUID"), verbose == false),
conditionalNot(containsString("testIndex"), verbose == false || hasClusterState == false),
conditionalNot(containsString("no name for uuid: testUUID"), verbose == false || hasClusterState)
);

verifyUnchangedOnAbort(noSearchNoClusterManagerSettings, matcher, verbose);

// verify test setup
expectThrows(IllegalStateException.class, () -> new NodeEnvironment(noSearchNoClusterManagerSettings, environment).close());

verifySuccess(noSearchNoClusterManagerSettings, matcher, verbose);

// verify clean.
new NodeEnvironment(noSearchNoClusterManagerSettings, environment).close();
}

static void verifySuccess(Settings settings, Matcher<String> outputMatcher, boolean verbose) throws Exception {
withTerminal(verbose, outputMatcher, terminal -> {
terminal.addTextInput(randomFrom("y", "Y"));
Expand Down Expand Up @@ -256,6 +325,10 @@ private static void executeRepurposeCommand(MockTerminal terminal, Settings sett
}

private void createIndexDataFiles(Settings settings, int shardCount, boolean writeClusterState) throws IOException {
createIndexDataFiles(settings, shardCount, writeClusterState, false);
}

private void createIndexDataFiles(Settings settings, int shardCount, boolean writeClusterState, boolean cacheMode) throws IOException {
int shardDataDirNumber = randomInt(10);
Environment environment = TestEnvironment.newEnvironment(settings);
try (NodeEnvironment env = new NodeEnvironment(settings, environment)) {
Expand Down Expand Up @@ -287,12 +360,23 @@ private void createIndexDataFiles(Settings settings, int shardCount, boolean wri
);
}
}
for (Path path : env.indexPaths(INDEX)) {

if (cacheMode) {
Path cachePath = env.fileCacheNodePath().fileCachePath;
cachePath = cachePath.resolve(String.valueOf(env.getNodeLockId())).resolve(INDEX.getUUID());
for (int i = 0; i < shardCount; ++i) {
Files.createDirectories(path.resolve(Integer.toString(shardDataDirNumber)));
Files.createDirectories(cachePath.resolve(Integer.toString(shardDataDirNumber)));
shardDataDirNumber += randomIntBetween(1, 10);
}
} else {
for (Path path : env.indexPaths(INDEX)) {
for (int i = 0; i < shardCount; ++i) {
Files.createDirectories(path.resolve(Integer.toString(shardDataDirNumber)));
shardDataDirNumber += randomIntBetween(1, 10);
}
}
}

}
}

Expand Down

0 comments on commit bc3e16e

Please sign in to comment.