diff --git a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index fe84fb9e38c8d..7ae6a13724d66 100644 --- a/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/server/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -30,6 +30,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; @@ -91,8 +92,16 @@ private static void deleteFileIfExists(Path stateLocation, Directory directory, logger.trace("cleaned up {}", stateLocation.resolve(fileName)); } - private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) - throws IOException { + private static void deleteFileIgnoreExceptions(Path stateLocation, Directory directory, String fileName) { + try { + deleteFileIfExists(stateLocation, directory, fileName); + } catch (IOException e) { + logger.trace("clean up failed {}", stateLocation.resolve(fileName)); + } + } + + private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String tmpFileName) + throws WriteStateException { try { deleteFileIfExists(stateLocation, stateDir, tmpFileName); try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { @@ -100,42 +109,70 @@ private void writeStateToFirstLocation(final T state, Path stateLocation, Direct out.writeInt(FORMAT.index()); try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { @Override - public void close() throws IOException { + public void close() { // this is important since some of the XContentBuilders write bytes on close. // in order to write the footer we need to prevent closing the actual index input. } })) { builder.startObject(); - { - toXContent(builder, state); - } + toXContent(builder, state); builder.endObject(); } CodecUtil.writeFooter(out); } stateDir.sync(Collections.singleton(tmpFileName)); - stateDir.rename(tmpFileName, fileName); - stateDir.syncMetaData(); - logger.trace("written state to {}", stateLocation.resolve(fileName)); - } finally { - deleteFileIfExists(stateLocation, stateDir, tmpFileName); + } catch (Exception e) { + throw new WriteStateException(false, "failed to write state to the first location tmp file " + + stateLocation.resolve(tmpFileName), e); } } - private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) - throws IOException { - try (Directory extraStateDir = newDirectory(extraStateLocation)) { + private static void copyStateToExtraLocations(List> stateDirs, String tmpFileName) + throws WriteStateException { + Directory srcStateDir = stateDirs.get(0).v2(); + for (int i = 1; i < stateDirs.size(); i++) { + Tuple extraStatePathAndDir = stateDirs.get(i); + Path extraStateLocation = extraStatePathAndDir.v1(); + Directory extraStateDir = extraStatePathAndDir.v2(); try { deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); - extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); + extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT); extraStateDir.sync(Collections.singleton(tmpFileName)); - extraStateDir.rename(tmpFileName, fileName); - extraStateDir.syncMetaData(); - logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); - } finally { - deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); + } catch (Exception e) { + throw new WriteStateException(false, "failed to copy tmp state file to extra location " + extraStateLocation, e); + } + } + } + + private static void performRenames(String tmpFileName, String fileName, final List> stateDirectories) throws + WriteStateException { + Directory firstStateDirectory = stateDirectories.get(0).v2(); + try { + firstStateDirectory.rename(tmpFileName, fileName); + } catch (IOException e) { + throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location " + + stateDirectories.get(0).v1().resolve(tmpFileName), e); + } + + for (int i = 1; i < stateDirectories.size(); i++) { + Directory extraStateDirectory = stateDirectories.get(i).v2(); + try { + extraStateDirectory.rename(tmpFileName, fileName); + } catch (IOException e) { + throw new WriteStateException(true, "failed to rename tmp file to final name in extra state location " + + stateDirectories.get(i).v1().resolve(tmpFileName), e); + } + } + } + + private static void performStateDirectoriesFsync(List> stateDirectories) throws WriteStateException { + for (int i = 0; i < stateDirectories.size(); i++) { + try { + stateDirectories.get(i).v2().syncMetaData(); + } catch (IOException e) { + throw new WriteStateException(true, "meta data directory fsync has failed " + stateDirectories.get(i).v1(), e); } } } @@ -145,32 +182,52 @@ private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLoca * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to * it's target filename of the pattern {@code {prefix}{version}.st}. - * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. - * But if this method throws an exception, loadLatestState could return this state or some previous state. + * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return + * it. * * @param state the state object to write * @param locations the locations where the state should be written to. - * @throws IOException if an IOException occurs + * @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}. */ - public final void write(final T state, final Path... locations) throws IOException { + + public final void write(final T state, final Path... locations) throws WriteStateException { if (locations == null) { throw new IllegalArgumentException("Locations must not be null"); } if (locations.length <= 0) { throw new IllegalArgumentException("One or more locations required"); } - final long maxStateId = findMaxStateId(prefix, locations) + 1; + + long maxStateId; + try { + maxStateId = findMaxStateId(prefix, locations) + 1; + } catch (Exception e) { + throw new WriteStateException(false, "exception during looking up max state id", e); + } assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; final String tmpFileName = fileName + ".tmp"; - final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); - try (Directory stateDir = newDirectory(firstStateLocation)) { - writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); + List> directories = new ArrayList<>(); + + try { + for (Path location : locations) { + Path stateLocation = location.resolve(STATE_DIR_NAME); + try { + directories.add(new Tuple<>(location, newDirectory(stateLocation))); + } catch (IOException e) { + throw new WriteStateException(false, "failed to open state directory " + stateLocation, e); + } + } - for (int i = 1; i < locations.length; i++) { - final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); - copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); + writeStateToFirstLocation(state, directories.get(0).v1(), directories.get(0).v2(), tmpFileName); + copyStateToExtraLocations(directories, tmpFileName); + performRenames(tmpFileName, fileName, directories); + performStateDirectoriesFsync(directories); + } finally { + for (Tuple pathAndDirectory : directories) { + deleteFileIgnoreExceptions(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); + IOUtils.closeWhileHandlingException(pathAndDirectory.v2()); } } @@ -227,16 +284,18 @@ protected Directory newDirectory(Path dir) throws IOException { return new SimpleFSDirectory(dir); } - private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { + private void cleanupOldFiles(final String currentStateFile, Path[] locations) { for (Path location : locations) { logger.trace("cleanupOldFiles: cleaning up {}", location); Path stateLocation = location.resolve(STATE_DIR_NAME); try (Directory stateDir = newDirectory(stateLocation)) { for (String file : stateDir.listAll()) { if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { - deleteFileIfExists(stateLocation, stateDir, file); + deleteFileIgnoreExceptions(stateLocation, stateDir, file); } } + } catch (Exception e) { + logger.trace("clean up failed for state location {}", stateLocation); } } } diff --git a/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java new file mode 100644 index 0000000000000..3283f01b9def7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/gateway/WriteStateException.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.gateway; + +import java.io.IOException; + +/** + * This exception is thrown when there is a problem of writing state to disk. + */ +public class WriteStateException extends IOException { + private boolean dirty; + + public WriteStateException(boolean dirty, String message, Exception cause) { + super(message, cause); + this.dirty = dirty; + } + + /** + * If this method returns false, state is guaranteed to be not written to disk. + * If this method returns true, we don't know if state is written to disk. + */ + public boolean isDirty() { + return dirty; + } +} diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java index 4acb5f428a72c..d8433963e5226 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataStateFormatTests.java @@ -317,7 +317,9 @@ public void testFailWriteAndReadPreviousState() throws IOException { Format.FAIL_FSYNC_TMP_FILE, Format.FAIL_RENAME_TMP_FILE); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - expectThrows(IOException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + assertFalse(ex.isDirty()); + format.noFailures(); assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path)); } @@ -338,7 +340,9 @@ public void testFailWriteAndReadAnyState() throws IOException { DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); possibleStates.add(newState); - expectThrows(IOException.class, () -> format.write(newState, path)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, path)); + assertTrue(ex.isDirty()); + format.noFailures(); assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, path))); } @@ -346,22 +350,24 @@ public void testFailWriteAndReadAnyState() throws IOException { writeAndReadStateSuccessfully(format, path); } - public void testFailCopyStateToExtraLocation() throws IOException { + public void testFailCopyTmpFileToExtraLocation() throws IOException { Path paths[] = new Path[randomIntBetween(2, 5)]; for (int i = 0; i < paths.length; i++) { paths[i] = createTempDir(); } Format format = new Format("foo-"); - writeAndReadStateSuccessfully(format, paths); + DummyState initialState = writeAndReadStateSuccessfully(format, paths); for (int i = 0; i < randomIntBetween(1, 5); i++) { format.failOnMethods(Format.FAIL_OPEN_STATE_FILE_WHEN_COPYING); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - expectThrows(IOException.class, () -> format.write(newState, paths)); + WriteStateException ex = expectThrows(WriteStateException.class, () -> format.write(newState, paths)); + assertFalse(ex.isDirty()); + format.noFailures(); - assertEquals(newState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); + assertEquals(initialState, format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths)); } writeAndReadStateSuccessfully(format, paths); @@ -382,14 +388,21 @@ public void testFailRandomlyAndReadAnyState() throws IOException { format.failRandomly(); DummyState newState = new DummyState(randomRealisticUnicodeOfCodepointLengthBetween(1, 100), randomInt(), randomLong(), randomDouble(), randomBoolean()); - possibleStates.add(newState); try { format.write(newState, paths); - } catch (Exception e) { - // since we're injecting failures at random it's ok if exception is thrown, it's also ok if exception is not thrown + possibleStates.clear(); + possibleStates.add(newState); + } catch (WriteStateException e) { + if (e.isDirty()) { + possibleStates.add(newState); + } } + format.noFailures(); - assertTrue(possibleStates.contains(format.loadLatestState(logger, NamedXContentRegistry.EMPTY, paths))); + //we call loadLatestState not on full path set, but only on random paths from this set. This is to emulate disk failures. + Path[] randomPaths = randomSubsetOf(randomIntBetween(1, paths.length), paths).toArray(new Path[0]); + DummyState stateOnDisk = format.loadLatestState(logger, NamedXContentRegistry.EMPTY, randomPaths); + assertTrue(possibleStates.contains(stateOnDisk)); } writeAndReadStateSuccessfully(format, paths);