Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make NodeEnvironment.nodeDataPaths singular #72432

Merged
merged 8 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testBootstrapNoClusterState() throws IOException {
internalCluster().stopRandomDataNode();
Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());

expectThrows(() -> unsafeBootstrap(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
}
Expand All @@ -170,7 +170,7 @@ public void testDetachNoClusterState() throws IOException {
internalCluster().stopRandomDataNode();
Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(dataPathSettings).build());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPaths());
PersistedClusterStateService.deleteAll(nodeEnvironment.nodeDataPath());

expectThrows(() -> detachCluster(environment), ElasticsearchNodeCommand.NO_NODE_METADATA_FOUND_MSG);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void test3MasterNodes2Failed() throws Exception {

logger.info("--> unsafely-bootstrap 1st master-eligible node");
MockTerminal terminal = unsafeBootstrap(environmentMaster1);
Metadata metadata = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPaths())
Metadata metadata = ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, nodeEnvironment.nodeDataPath())
.loadBestOnDiskState().metadata;
assertThat(terminal.getOutput(), containsString(
String.format(Locale.ROOT, UnsafeBootstrapMasterCommand.CLUSTER_STATE_TERM_VERSION_MSG_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ public Settings onNodeStopped(String nodeName) {
assertThat(ex.getMessage(), startsWith("node does not have the data role but has shard data"));
}

private IllegalStateException expectThrowsOnRestart(CheckedConsumer<Path[], Exception> onNodeStopped) {
private IllegalStateException expectThrowsOnRestart(CheckedConsumer<Path, Exception> onNodeStopped) {
internalCluster().startNode();
final Path[] dataPaths = internalCluster().getInstance(NodeEnvironment.class).nodeDataPaths();
final Path dataPath = internalCluster().getInstance(NodeEnvironment.class).nodeDataPath();
return expectThrows(IllegalStateException.class,
() -> internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
try {
onNodeStopped.accept(dataPaths);
onNodeStopped.accept(dataPath);
} catch (Exception e) {
throw new AssertionError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,20 +513,18 @@ public void testHalfDeletedIndexImport() throws Exception {
ensureGreen("test");

final Metadata metadata = internalCluster().getInstance(ClusterService.class).state().metadata();
final Path[] paths = internalCluster().getInstance(NodeEnvironment.class).nodeDataPaths();
final Path path = internalCluster().getInstance(NodeEnvironment.class).nodeDataPath();
final String nodeId = client().admin().cluster().prepareNodesInfo(nodeName).clear().get().getNodes().get(0).getNode().getId();

writeBrokenMeta(metaStateService -> {
for (final Path path : paths) {
IOUtils.rm(path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME));
}
IOUtils.rm(path.resolve(PersistedClusterStateService.METADATA_DIRECTORY_NAME));
metaStateService.writeGlobalState("test", Metadata.builder(metadata)
// we remove the manifest file, resetting the term and making this look like an upgrade from 6.x, so must also reset the
// term in the coordination metadata
.coordinationMetadata(CoordinationMetadata.builder(metadata.coordinationMetadata()).term(0L).build())
// add a tombstone but do not delete the index metadata from disk
.putCustom(IndexGraveyard.TYPE, IndexGraveyard.builder().addTombstone(metadata.index("test").getIndex()).build()).build());
NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(nodeId, Version.CURRENT), paths);
NodeMetadata.FORMAT.writeAndCleanup(new NodeMetadata(nodeId, Version.CURRENT), path);
});

ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ElasticsearchNodeCommand(String description) {
super(description);
}

public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path[] dataPaths) throws IOException {
public static PersistedClusterStateService createPersistedClusterStateService(Settings settings, Path... dataPaths) throws IOException {
final NodeMetadata nodeMetadata = PersistedClusterStateService.nodeMetadata(dataPaths);
if (nodeMetadata == null) {
throw new ElasticsearchException(NO_NODE_METADATA_FOUND_MSG);
Expand Down
12 changes: 3 additions & 9 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -894,13 +894,9 @@ public boolean hasNodeFile() {
* Returns an array of all of the nodes data locations.
* @throws IllegalStateException if the node is not configured to store local locations
*/
public Path[] nodeDataPaths() {
public Path nodeDataPath() {
assertEnvIsLocked();
Path[] paths = new Path[nodePaths.length];
for(int i=0;i<paths.length;i++) {
paths[i] = nodePaths[i].path;
}
return paths;
return nodePaths[0].path;
}

/**
Expand Down Expand Up @@ -1304,9 +1300,7 @@ public static Path shardStatePathToDataPath(Path shardPath) {
* This prevents disasters if nodes are started under the wrong username etc.
*/
private void assertCanWrite() throws IOException {
for (Path path : nodeDataPaths()) { // check node-paths are writable
tryWriteTempFile(path);
}
tryWriteTempFile(nodeDataPath());
for (String indexFolderName : this.availableIndexFolders()) {
for (Path indexPath : this.resolveIndexFolder(indexFolderName)) { // check index paths are writable
Path indexStatePath = indexPath.resolve(MetadataStateFormat.STATE_DIR_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXCon
* @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file.
*/
public Tuple<Manifest, Metadata> loadFullState() throws IOException {
final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
if (manifest == null) {
return loadFullStateBWC();
}
Expand All @@ -68,7 +68,7 @@ public Tuple<Manifest, Metadata> loadFullState() throws IOException {
metadataBuilder = Metadata.builder();
} else {
final Metadata globalMetadata = METADATA_FORMAT.loadGeneration(logger, namedXContentRegistry, manifest.getGlobalGeneration(),
nodeEnv.nodeDataPaths());
nodeEnv.nodeDataPath());
if (globalMetadata != null) {
metadataBuilder = Metadata.builder(globalMetadata);
} else {
Expand Down Expand Up @@ -101,7 +101,7 @@ private Tuple<Manifest, Metadata> loadFullStateBWC() throws IOException {
Metadata.Builder metadataBuilder;

Tuple<Metadata, Long> metadataAndGeneration =
METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
METADATA_FORMAT.loadLatestStateWithGeneration(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
Metadata globalMetadata = metadataAndGeneration.v1();
long globalStateGeneration = metadataAndGeneration.v2();

Expand Down Expand Up @@ -173,7 +173,7 @@ List<IndexMetadata> loadIndicesStates(Predicate<String> excludeIndexPathIdsPredi
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
*/
Metadata loadGlobalState() throws IOException {
return METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
return METADATA_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPath());
}

/**
Expand All @@ -185,7 +185,7 @@ Metadata loadGlobalState() throws IOException {
public void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException {
logger.trace("[_meta] writing state, reason [{}]", reason);
try {
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPaths());
long generation = MANIFEST_FORMAT.writeAndCleanup(manifest, nodeEnv.nodeDataPath());
logger.trace("[_meta] state written (generation: {})", generation);
} catch (WriteStateException ex) {
throw new WriteStateException(ex.isDirty(), "[_meta]: failed to write meta state", ex);
Expand Down Expand Up @@ -222,7 +222,7 @@ public long writeIndex(String reason, IndexMetadata indexMetadata) throws WriteS
long writeGlobalState(String reason, Metadata metadata) throws WriteStateException {
logger.trace("[_global] writing state, reason [{}]", reason);
try {
long generation = METADATA_FORMAT.write(metadata, nodeEnv.nodeDataPaths());
long generation = METADATA_FORMAT.write(metadata, nodeEnv.nodeDataPath());
logger.trace("[_global] state written");
return generation;
} catch (WriteStateException ex) {
Expand All @@ -236,7 +236,7 @@ long writeGlobalState(String reason, Metadata metadata) throws WriteStateExcepti
* @param currentGeneration current state generation to keep in the directory.
*/
void cleanupGlobalState(long currentGeneration) {
METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPaths());
METADATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.nodeDataPath());
}

/**
Expand All @@ -254,8 +254,8 @@ public void cleanupIndex(Index index, long currentGeneration) {
* (only used for dangling indices at that point).
*/
public void unreferenceAll() throws IOException {
MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPaths()); // write empty file so that indices become unreferenced
METADATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPath()); // write empty file so that indices become unreferenced
METADATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPath());
}

/**
Expand All @@ -270,6 +270,6 @@ public void deleteAll() throws IOException {
// delete meta state directories of indices
MetadataStateFormat.deleteMetaState(nodeEnv.resolveIndexFolder(indexFolderName));
}
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); // finally delete manifest
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPath()); // finally delete manifest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ protected Directory newDirectory(Path dir) throws IOException {
* @param currentGeneration state generation to keep.
* @param locations state paths.
*/
public void cleanupOldFiles(final long currentGeneration, Path[] locations) {
public void cleanupOldFiles(final long currentGeneration, Path... locations) {
final String fileNameToKeep = getStateFileName(currentGeneration);
for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public class PersistedClusterStateService {

public PersistedClusterStateService(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
ClusterSettings clusterSettings, LongSupplier relativeTimeMillisSupplier) {
this(nodeEnvironment.nodeDataPaths(), nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
this(new Path[] { nodeEnvironment.nodeDataPath() }, nodeEnvironment.nodeId(), namedXContentRegistry, bigArrays, clusterSettings,
relativeTimeMillisSupplier);
}

Expand Down Expand Up @@ -202,7 +202,7 @@ private static IndexWriter createIndexWriter(Directory directory, boolean openEx
* Remove all persisted cluster states from the given data paths, for use in tests. Should only be called when there is no open
* {@link Writer} on these paths.
*/
public static void deleteAll(Path[] dataPaths) throws IOException {
public static void deleteAll(Path... dataPaths) throws IOException {
for (Path dataPath : dataPaths) {
Lucene.cleanLuceneIndex(new SimpleFSDirectory(dataPath.resolve(METADATA_DIRECTORY_NAME)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -141,43 +140,38 @@ public void run() {
}

private void monitorFSHealth() {
Set<Path> currentUnhealthyPaths = null;
Path[] paths = null;
Path path;
try {
paths = nodeEnv.nodeDataPaths();
path = nodeEnv.nodeDataPath();
} catch (IllegalStateException e) {
logger.error("health check failed", e);
brokenLock = true;
return;
}

for (Path path : paths) {
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Path tempDataPath = path.resolve(TEMP_FILE_NAME);
Files.deleteIfExists(tempDataPath);
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) {
os.write(byteToWrite);
IOUtils.fsync(tempDataPath, false);
}
Files.delete(tempDataPath);
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime;
if (elapsedTime > slowPathLoggingThreshold.millis()) {
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]",
path, elapsedTime, slowPathLoggingThreshold);
}
brokenLock = false;
long executionStartTime = currentTimeMillisSupplier.getAsLong();
try {
if (Files.exists(path)) {
Path tempDataPath = path.resolve(TEMP_FILE_NAME);
Files.deleteIfExists(tempDataPath);
try (OutputStream os = Files.newOutputStream(tempDataPath, StandardOpenOption.CREATE_NEW)) {
os.write(byteToWrite);
IOUtils.fsync(tempDataPath, false);
}
} catch (Exception ex) {
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);
if (currentUnhealthyPaths == null) {
currentUnhealthyPaths = new HashSet<>(1);
Files.delete(tempDataPath);
final long elapsedTime = currentTimeMillisSupplier.getAsLong() - executionStartTime;
if (elapsedTime > slowPathLoggingThreshold.millis()) {
logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]",
path, elapsedTime, slowPathLoggingThreshold);
}
currentUnhealthyPaths.add(path);
}
} catch (Exception ex) {
logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex);
unhealthyPaths = Set.of(path);
return;
}
unhealthyPaths = currentUnhealthyPaths;
brokenLock = false;
unhealthyPaths = null;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ public Node start() throws NodeValidationException {
try {
assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY,
nodeEnvironment.nodeDataPaths());
nodeEnvironment.nodeDataPath());
assert nodeMetadata != null;
assert nodeMetadata.nodeVersion().equals(Version.CURRENT);
assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testNodeLock() throws IOException {
// Close the environment that holds the lock and make sure we can get the lock after release
env.close();
env = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
assertThat(env.nodeDataPaths()[0], equalTo(PathUtils.get(dataPath)));
assertThat(env.nodeDataPath(), equalTo(PathUtils.get(dataPath)));

env.close();
assertThat(env.lockedShards(), empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void createNodePaths() throws IOException {
dataMasterSettings = buildEnvSettings(Settings.EMPTY);
environment = TestEnvironment.newEnvironment(dataMasterSettings);
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(dataMasterSettings, environment)) {
nodePaths = nodeEnvironment.nodeDataPaths();
nodePaths = new Path[] { nodeEnvironment.nodeDataPath() };
final String nodeId = randomAlphaOfLength(10);
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(nodePaths, nodeId,
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
Expand Down Expand Up @@ -89,7 +89,7 @@ public void testNothingToCleanup() throws Exception {
if (randomBoolean()) {
try (NodeEnvironment env = new NodeEnvironment(noDataMasterSettings, environment)) {
try (PersistedClusterStateService.Writer writer =
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) {
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPath()).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.EMPTY_STATE);
}
}
Expand Down Expand Up @@ -217,7 +217,7 @@ private void createIndexDataFiles(Settings settings, int shardCount, boolean wri
try (NodeEnvironment env = new NodeEnvironment(settings, environment)) {
if (writeClusterState) {
try (PersistedClusterStateService.Writer writer =
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPaths()).createWriter()) {
ElasticsearchNodeCommand.createPersistedClusterStateService(Settings.EMPTY, env.nodeDataPath()).createWriter()) {
writer.writeFullStateAndCommit(1L, ClusterState.builder(ClusterName.DEFAULT)
.metadata(Metadata.builder().put(IndexMetadata.builder(INDEX.getName())
.settings(Settings.builder().put("index.version.created", Version.CURRENT)
Expand Down
Loading