Skip to content

Commit

Permalink
Revert "Fail when using multiple data paths (elastic#72184)"
Browse files Browse the repository at this point in the history
This reverts commit 6aa0735.

This revert was conflict free.

relates elastic#78525
relates elastic#71205
  • Loading branch information
rjernst committed Oct 14, 2021
1 parent d97d282 commit 48cae8c
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
Expand All @@ -24,6 +25,8 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -44,6 +47,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class MockDiskUsagesIT extends ESIntegTestCase {
Expand Down Expand Up @@ -315,6 +320,83 @@ public void testDoesNotExceedLowWatermarkWhenRebalancing() throws Exception {
assertThat("node2 has 1 shard", getShardCountByNodeId().get(nodeIds.get(2)), equalTo(1));
}

public void testMovesShardsOffSpecificDataPathAboveWatermark() throws Exception {

// start one node with two data paths
final Path pathOverWatermark = createTempDir();
final Settings.Builder twoPathSettings = Settings.builder();
if (randomBoolean()) {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toString(), pathOverWatermark.toString());
} else {
twoPathSettings.putList(Environment.PATH_DATA_SETTING.getKey(), pathOverWatermark.toString(), createTempDir().toString());
}
internalCluster().startNode(twoPathSettings);
final String nodeWithTwoPaths = client().admin().cluster().prepareNodesInfo().get().getNodes().get(0).getNode().getId();

// other two nodes have one data path each
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));
internalCluster().startNode(Settings.builder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()));

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();

// prevent any effects from in-flight recoveries, since we are only simulating a 100-byte disk
clusterInfoService.setShardSizeFunctionAndRefresh(shardRouting -> 0L);

// start with all paths below the watermark
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100, between(10, 100)));

assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "90%")
.put(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "100%")));

final List<String> nodeIds = StreamSupport.stream(client().admin().cluster().prepareState().get().getState()
.getRoutingNodes().spliterator(), false).map(RoutingNode::nodeId).collect(Collectors.toList());

assertAcked(prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 6).put("number_of_replicas", 0)));

ensureGreen("test");

{
final Map<String, Integer> shardCountByNodeId = getShardCountByNodeId();
assertThat("node0 has 2 shards", shardCountByNodeId.get(nodeIds.get(0)), equalTo(2));
assertThat("node1 has 2 shards", shardCountByNodeId.get(nodeIds.get(1)), equalTo(2));
assertThat("node2 has 2 shards", shardCountByNodeId.get(nodeIds.get(2)), equalTo(2));
}

final long shardsOnGoodPath = Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count();
logger.info("--> shards on good path: [{}]", shardsOnGoodPath);

// disable rebalancing, or else we might move shards back onto the over-full path since we're not faking that
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));

// one of the paths on node0 suddenly exceeds the high watermark
clusterInfoService.setDiskUsageFunctionAndRefresh((discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, 100L,
fsInfoPath.getPath().startsWith(pathOverWatermark.toString()) ? between(0, 9) : between(10, 100)));

logger.info("--> waiting for shards to relocate off path [{}]", pathOverWatermark);

assertBusy(() -> {
for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}
});

ensureGreen("test");

for (final ShardStats shardStats : client().admin().indices().prepareStats("test").get().getShards()) {
assertThat(shardStats.getDataPath(), not(startsWith(pathOverWatermark.toString())));
}

assertThat("should not have moved any shards off of the path that wasn't too full",
Arrays.stream(client().admin().indices().prepareStats("test").get().getShards())
.filter(shardStats -> shardStats.getShardRouting().currentNodeId().equals(nodeWithTwoPaths)
&& shardStats.getDataPath().startsWith(pathOverWatermark.toString()) == false).count(), equalTo(shardsOnGoodPath));
}

private Map<String, Integer> getShardCountByNodeId() {
final Map<String, Integer> shardCountByNodeId = new HashMap<>();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -202,4 +203,38 @@ public void testUpgradeDataFolder() throws IOException, InterruptedException {
ensureYellow("test");
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
}

public void testFailsToStartOnDataPathsFromMultipleNodes() throws IOException {
final List<String> nodes = internalCluster().startNodes(2);
ensureStableCluster(2);

final List<String> node0DataPaths = Environment.PATH_DATA_SETTING.get(internalCluster().dataPathSettings(nodes.get(0)));
final List<String> node1DataPaths = Environment.PATH_DATA_SETTING.get(internalCluster().dataPathSettings(nodes.get(1)));

final List<String> allDataPaths = new ArrayList<>(node0DataPaths);
allDataPaths.addAll(node1DataPaths);

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(1)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(0)));

IllegalStateException illegalStateException = expectThrows(IllegalStateException.class,
() -> PersistedClusterStateService.nodeMetadata(allDataPaths.stream().map(PathUtils::get).toArray(Path[]::new)));

assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata"));

illegalStateException = expectThrows(IllegalStateException.class,
() -> internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), allDataPaths)));

assertThat(illegalStateException.getMessage(), containsString("unexpected node ID in metadata"));

final List<String> node0DataPathsPlusOne = new ArrayList<>(node0DataPaths);
node0DataPathsPlusOne.add(createTempDir().toString());
internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), node0DataPathsPlusOne));

final List<String> node1DataPathsPlusOne = new ArrayList<>(node1DataPaths);
node1DataPathsPlusOne.add(createTempDir().toString());
internalCluster().startNode(Settings.builder().putList(Environment.PATH_DATA_SETTING.getKey(), node1DataPathsPlusOne));

ensureStableCluster(2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.env.NodeEnvironment.INDICES_FOLDER;
import static org.elasticsearch.gateway.MetadataStateFormat.STATE_DIR_NAME;
Expand Down Expand Up @@ -240,8 +241,7 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
.build()
);

// TODO: decide if multiple leftovers should/can be tested without MDP
final Index[] leftovers = new Index[1];
final Index[] leftovers = new Index[between(1, 3)];
logger.debug("--> creating [{}] leftover indices on data node [{}]", leftovers.length, dataNode);
for (int i = 0; i < leftovers.length; i++) {
final String indexName = "index-" + i;
Expand Down Expand Up @@ -275,19 +275,22 @@ public void testListenersInvokedWhenIndexHasLeftOverShard() throws Exception {
final Index index = internalCluster().clusterService(masterNode).state().metadata().index(indexName).getIndex();
logger.debug("--> index [{}] created", index);

final Path dataPath = createTempDir();
final Path shardPath = dataPath.resolve(INDICES_FOLDER).resolve(index.getUUID()).resolve("0");
Files.createDirectories(shardPath);
final List<Path> dataPaths = new ArrayList<>();
for (int i = 0; i < leftovers.length; i++) {
final Path dataPath = createTempDir();
dataPaths.add(dataPath);
final Path shardPath = dataPath.resolve(INDICES_FOLDER).resolve(index.getUUID()).resolve("0");
Files.createDirectories(shardPath);
final Path leftoverPath = dataDirWithLeftOverShards.resolve(INDICES_FOLDER).resolve(leftovers[i].getUUID()).resolve("0");
Files.move(leftoverPath.resolve(STATE_DIR_NAME), shardPath.resolve(STATE_DIR_NAME));
Files.move(leftoverPath.resolve(INDEX_FOLDER_NAME), shardPath.resolve(INDEX_FOLDER_NAME));
}

logger.debug("--> starting another data node with data path [{}]", dataPath);
logger.debug("--> starting another data node with data paths [{}]", dataPaths);
dataNode = internalCluster().startDataOnlyNode(
Settings.builder()
.put(Environment.PATH_DATA_SETTING.getKey(), dataPath.toAbsolutePath().toString())
.putList(Environment.PATH_DATA_SETTING.getKey(),
dataPaths.stream().map(p -> p.toAbsolutePath().toString()).collect(Collectors.toList()))
.putNull(Environment.PATH_SHARED_DATA_SETTING.getKey())
.build());
ensureStableCluster(1 + 1, masterNode);
Expand Down
7 changes: 1 addition & 6 deletions server/src/main/java/org/elasticsearch/env/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,7 @@ public Environment(final Settings settings, final Path configPath) {

final Settings.Builder finalSettings = Settings.builder().put(settings);
if (PATH_DATA_SETTING.exists(settings)) {
if (dataFiles.length == 1) {
finalSettings.put(PATH_DATA_SETTING.getKey(), dataFiles[0].toString());
} else {
finalSettings.putList(PATH_DATA_SETTING.getKey(),
Arrays.stream(dataFiles).map(Path::toString).collect(Collectors.toList()));
}
finalSettings.putList(PATH_DATA_SETTING.getKey(), Arrays.stream(dataFiles).map(Path::toString).collect(Collectors.toList()));
}
finalSettings.put(PATH_HOME_SETTING.getKey(), homeFile);
finalSettings.put(PATH_LOGS_SETTING.getKey(), logsFile.toString());
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,15 @@ protected Node(final Environment initialEnvironment,
}

if (initialEnvironment.dataFiles().length > 1) {
throw new IllegalArgumentException("Multiple [path.data] values found. Specify a single data path.");
} else if (Environment.dataPathUsesList(tmpSettings)) {
throw new IllegalArgumentException("[path.data] is a list. Specify as a string value.");
// NOTE: we use initialEnvironment here, but assertEquivalent below ensures the data paths do not change
deprecationLogger.deprecate(DeprecationCategory.SETTINGS, "multiple-data-paths",
"Configuring multiple [path.data] paths is deprecated. Use RAID or other system level features for utilizing " +
"multiple disks. This feature will be removed in 8.0.");
}
if (Environment.dataPathUsesList(tmpSettings)) {
// already checked for multiple values above, so if this is a list it is a single valued list
deprecationLogger.deprecate(DeprecationCategory.SETTINGS, "multiple-data-paths-list",
"Configuring [path.data] with a list is deprecated. Instead specify as a string value.");
}

if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,7 @@ protected TestCluster buildTestCluster(Scope scope, long seed) throws IOExceptio
return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoManageMasterNodes(),
minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
nodePrefix, mockPlugins, getClientWrapper(), forbidPrivateIndexSettings());
nodePrefix, mockPlugins, getClientWrapper(), forbidPrivateIndexSettings(), forceSingleDataPath());
}

private NodeConfigurationSource getNodeConfigSource() {
Expand Down Expand Up @@ -2149,6 +2149,13 @@ protected boolean forbidPrivateIndexSettings() {
return true;
}

/**
* Override to return true in tests that cannot handle multiple data paths.
*/
protected boolean forceSingleDataPath() {
return false;
}

/**
* Returns an instance of {@link RestClient} pointing to the current test cluster.
* Creates a new client if the method is invoked for the first time in the context of the current test scope.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.TestRuleMarkFailure;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.Version;
import org.elasticsearch.bootstrap.BootstrapForTesting;
Expand Down Expand Up @@ -1050,7 +1051,12 @@ public Path getDataPath(String relativePath) {

/** Returns a random number of temporary paths. */
public String[] tmpPaths() {
return new String[] { createTempDir().toAbsolutePath().toString() };
final int numPaths = TestUtil.nextInt(random(), 1, 3);
final String[] absPaths = new String[numPaths];
for (int i = 0; i < numPaths; i++) {
absPaths[i] = createTempDir().toAbsolutePath().toString();
}
return absPaths;
}

public NodeEnvironment newNodeEnvironment() throws IOException {
Expand All @@ -1061,7 +1067,7 @@ public Settings buildEnvSettings(Settings settings) {
return Settings.builder()
.put(settings)
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath())
.put(Environment.PATH_DATA_SETTING.getKey(), createTempDir().toAbsolutePath()).build();
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths()).build();
}

public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
Expand Down Expand Up @@ -235,6 +236,8 @@ public final class InternalTestCluster extends TestCluster {

private final boolean forbidPrivateIndexSettings;

private final int numDataPaths;

/**
* All nodes started by the cluster will have their name set to nodePrefix followed by a positive number
*/
Expand Down Expand Up @@ -272,7 +275,8 @@ public InternalTestCluster(
nodePrefix,
mockPlugins,
clientWrapper,
true);
true,
false);
}

public InternalTestCluster(
Expand All @@ -288,7 +292,8 @@ public InternalTestCluster(
final String nodePrefix,
final Collection<Class<? extends Plugin>> mockPlugins,
final Function<Client, Client> clientWrapper,
final boolean forbidPrivateIndexSettings) {
final boolean forbidPrivateIndexSettings,
final boolean forceSingleDataPath) {
super(clusterSeed);
this.autoManageMasterNodes = autoManageMasterNodes;
this.clientWrapper = clientWrapper;
Expand Down Expand Up @@ -350,6 +355,8 @@ public InternalTestCluster(
numSharedDedicatedMasterNodes, numSharedDataNodes, numSharedCoordOnlyNodes,
autoManageMasterNodes ? "auto-managed" : "manual");
this.nodeConfigurationSource = nodeConfigurationSource;
// use 1 data path if we are forced to, or 80% of the time that we are not, otherwise use between 2 and 4 data paths
numDataPaths = forceSingleDataPath || random.nextDouble() < 0.8 ? 1 : RandomNumbers.randomIntBetween(random, 2, 4);
Builder builder = Settings.builder();
builder.put(Environment.PATH_HOME_SETTING.getKey(), baseDir);
builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos"));
Expand Down Expand Up @@ -640,7 +647,14 @@ private Settings getNodeSettings(final int nodeId, final long seed, final Settin
final Settings.Builder updatedSettings = Settings.builder();

updatedSettings.put(Environment.PATH_HOME_SETTING.getKey(), baseDir);
updatedSettings.put(Environment.PATH_DATA_SETTING.getKey(), baseDir.resolve(name));

if (numDataPaths > 1) {
updatedSettings.putList(Environment.PATH_DATA_SETTING.getKey(), IntStream.range(0, numDataPaths).mapToObj(i ->
baseDir.resolve(name).resolve("d" + i).toString()).collect(Collectors.toList()));
} else {
updatedSettings.put(Environment.PATH_DATA_SETTING.getKey(), baseDir.resolve(name));
}

updatedSettings.put(Environment.PATH_SHARED_DATA_SETTING.getKey(), baseDir.resolve(name + "-shared"));

// allow overriding the above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public abstract class AbstractFrozenAutoscalingIntegTestCase extends AbstractSna
protected final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
protected final String policyName = "frozen";

@Override
protected boolean forceSingleDataPath() {
return true;
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import org.elasticsearch.xpack.searchablesnapshots.cache.shared.FrozenCacheService;

public class BaseFrozenSearchableSnapshotsIntegTestCase extends BaseSearchableSnapshotsIntegTestCase {
@Override
protected boolean forceSingleDataPath() {
return true;
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Expand Down

0 comments on commit 48cae8c

Please sign in to comment.