|
30 | 30 | import org.apache.lucene.store.IndexOutput; |
31 | 31 | import org.apache.lucene.store.SimpleFSDirectory; |
32 | 32 | import org.elasticsearch.ExceptionsHelper; |
| 33 | +import org.elasticsearch.common.collect.Tuple; |
33 | 34 | import org.elasticsearch.common.logging.Loggers; |
34 | 35 | import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; |
35 | 36 | import org.elasticsearch.common.lucene.store.InputStreamIndexInput; |
@@ -91,88 +92,162 @@ private static void deleteFileIfExists(Path stateLocation, Directory directory, |
91 | 92 | logger.trace("cleaned up {}", stateLocation.resolve(fileName)); |
92 | 93 | } |
93 | 94 |
|
94 | | - private void writeStateToFirstLocation(final T state, Path stateLocation, Directory stateDir, String fileName, String tmpFileName) |
95 | | - throws IOException { |
| 95 | + private static void deleteFileIgnoreExceptions(Path stateLocation, Directory directory, String fileName) { |
96 | 96 | try { |
97 | | - deleteFileIfExists(stateLocation, stateDir, tmpFileName); |
98 | | - try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { |
99 | | - CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); |
100 | | - out.writeInt(FORMAT.index()); |
101 | | - try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { |
102 | | - @Override |
103 | | - public void close() throws IOException { |
104 | | - // this is important since some of the XContentBuilders write bytes on close. |
105 | | - // in order to write the footer we need to prevent closing the actual index input. |
106 | | - } |
107 | | - })) { |
| 97 | + deleteFileIfExists(stateLocation, directory, fileName); |
| 98 | + } catch (IOException e) { |
| 99 | + logger.trace("clean up failed {}", stateLocation.resolve(fileName)); |
| 100 | + } |
| 101 | + } |
| 102 | + |
| 103 | + private static void performDirectoryCleanup(Path stateLocation, Directory stateDir, String tmpFileName) { |
| 104 | + deleteFileIgnoreExceptions(stateLocation, stateDir, tmpFileName); |
| 105 | + IOUtils.closeWhileHandlingException(stateDir); |
| 106 | + } |
108 | 107 |
|
109 | | - builder.startObject(); |
110 | | - { |
111 | | - toXContent(builder, state); |
| 108 | + private Directory writeStateToFirstLocation(final T state, Path stateLocation, String tmpFileName) |
| 109 | + throws WriteStateException { |
| 110 | + try { |
| 111 | + Directory stateDir = newDirectory(stateLocation); |
| 112 | + try { |
| 113 | + deleteFileIfExists(stateLocation, stateDir, tmpFileName); |
| 114 | + try (IndexOutput out = stateDir.createOutput(tmpFileName, IOContext.DEFAULT)) { |
| 115 | + CodecUtil.writeHeader(out, STATE_FILE_CODEC, STATE_FILE_VERSION); |
| 116 | + out.writeInt(FORMAT.index()); |
| 117 | + try (XContentBuilder builder = newXContentBuilder(FORMAT, new IndexOutputOutputStream(out) { |
| 118 | + @Override |
| 119 | + public void close() throws IOException { |
| 120 | + // this is important since some of the XContentBuilders write bytes on close. |
| 121 | + // in order to write the footer we need to prevent closing the actual index input. |
| 122 | + } |
| 123 | + })) { |
| 124 | + |
| 125 | + builder.startObject(); |
| 126 | + { |
| 127 | + toXContent(builder, state); |
| 128 | + } |
| 129 | + builder.endObject(); |
112 | 130 | } |
113 | | - builder.endObject(); |
| 131 | + CodecUtil.writeFooter(out); |
114 | 132 | } |
115 | | - CodecUtil.writeFooter(out); |
116 | | - } catch (IllegalStateException e) { |
117 | | - throw new IOException(e); |
118 | | - } |
119 | 133 |
|
120 | | - stateDir.sync(Collections.singleton(tmpFileName)); |
121 | | - stateDir.rename(tmpFileName, fileName); |
122 | | - stateDir.syncMetaData(); |
123 | | - logger.trace("written state to {}", stateLocation.resolve(fileName)); |
124 | | - } finally { |
125 | | - deleteFileIfExists(stateLocation, stateDir, tmpFileName); |
| 134 | + stateDir.sync(Collections.singleton(tmpFileName)); |
| 135 | + } catch (Exception e) { |
| 136 | + // perform clean up only in case of exception, we need to keep directory open and temporary file on disk |
| 137 | + // if everything is ok for the next algorithm steps |
| 138 | + performDirectoryCleanup(stateLocation, stateDir, tmpFileName); |
| 139 | + throw e; |
| 140 | + } |
| 141 | + return stateDir; |
| 142 | + } catch (Exception e) { |
| 143 | + throw new WriteStateException(false, "failed to write state to the first location tmp file", e); |
126 | 144 | } |
127 | 145 | } |
128 | 146 |
|
129 | | - private void copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String fileName, String tmpFileName) |
130 | | - throws IOException { |
131 | | - try (Directory extraStateDir = newDirectory(extraStateLocation)) { |
| 147 | + private Directory copyStateToExtraLocation(Directory srcStateDir, Path extraStateLocation, String tmpFileName) |
| 148 | + throws WriteStateException { |
| 149 | + try { |
| 150 | + Directory extraStateDir = newDirectory(extraStateLocation); |
132 | 151 | try { |
133 | 152 | deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); |
134 | | - extraStateDir.copyFrom(srcStateDir, fileName, tmpFileName, IOContext.DEFAULT); |
| 153 | + extraStateDir.copyFrom(srcStateDir, tmpFileName, tmpFileName, IOContext.DEFAULT); |
135 | 154 | extraStateDir.sync(Collections.singleton(tmpFileName)); |
136 | | - extraStateDir.rename(tmpFileName, fileName); |
137 | | - extraStateDir.syncMetaData(); |
138 | | - logger.trace("copied state to {}", extraStateLocation.resolve(fileName)); |
139 | | - } finally { |
140 | | - deleteFileIfExists(extraStateLocation, extraStateDir, tmpFileName); |
| 155 | + } catch (Exception e) { |
| 156 | + // perform clean up only in case of exception, we need to keep directory open and temporary file on disk |
| 157 | + // if everything is ok for the next algorithm steps |
| 158 | + performDirectoryCleanup(extraStateLocation, extraStateDir, tmpFileName); |
| 159 | + throw e; |
141 | 160 | } |
| 161 | + return extraStateDir; |
| 162 | + } catch (Exception e) { |
| 163 | + throw new WriteStateException(false, "failed to copy tmp state file to extra location", e); |
142 | 164 | } |
143 | 165 | } |
144 | 166 |
|
| 167 | + public void performRenames(String tmpFileName, String fileName, final List<Tuple<Path, Directory>> stateDirectories) throws |
| 168 | + WriteStateException { |
| 169 | + Directory firstStateDirectory = stateDirectories.get(0).v2(); |
| 170 | + try { |
| 171 | + firstStateDirectory.rename(tmpFileName, fileName); |
| 172 | + } catch (IOException e) { |
| 173 | + throw new WriteStateException(false, "failed to rename tmp file to final name in the first state location", e); |
| 174 | + } |
| 175 | + |
| 176 | + for (int i = 1; i < stateDirectories.size(); i++) { |
| 177 | + Directory extraStateDirectory = stateDirectories.get(i).v2(); |
| 178 | + try { |
| 179 | + extraStateDirectory.rename(tmpFileName, fileName); |
| 180 | + } catch (IOException e) { |
| 181 | + throw new WriteStateException(true, "failed to rename tmp file to final name in extra state location", |
| 182 | + e); |
| 183 | + } |
| 184 | + } |
| 185 | + } |
| 186 | + |
| 187 | + public void performStateDirectoriesFsync(List<Tuple<Path, Directory>> stateDirectories) throws WriteStateException { |
| 188 | + for (int i = 0; i < stateDirectories.size(); i++) { |
| 189 | + try { |
| 190 | + stateDirectories.get(i).v2().syncMetaData(); |
| 191 | + } catch (IOException e) { |
| 192 | + throw new WriteStateException(true, "meta data directory fsync has failed", e); |
| 193 | + } |
| 194 | + } |
| 195 | + } |
| 196 | + |
| 197 | + |
145 | 198 | /** |
146 | 199 | * Writes the given state to the given directories. The state is written to a |
147 | 200 | * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it |
148 | 201 | * doesn't exist. The state is serialized to a temporary file in that directory and is then atomically moved to |
149 | 202 | * it's target filename of the pattern {@code {prefix}{version}.st}. |
150 | | - * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return it. |
151 | | - * But if this method throws an exception, loadLatestState could return this state or some previous state. |
| 203 | + * If this method returns without exception there is a guarantee that state is persisted to the disk and loadLatestState will return |
| 204 | + * it. <br> |
| 205 | + * This method may throw an {@link WriteStateException} if some exception during writing state occurs. <br> |
| 206 | + * If {@link WriteStateException#isDirty()} returns false, there is a guarantee that loadLatestState will return old state. <br> |
| 207 | + * If {@link WriteStateException#isDirty()} returns true, loadLatestState could return new state or previous state. |
152 | 208 | * |
153 | 209 | * @param state the state object to write |
154 | 210 | * @param locations the locations where the state should be written to. |
155 | | - * @throws IOException if an IOException occurs |
| 211 | + * @throws WriteStateException if some exception during writing state occurs. |
156 | 212 | */ |
157 | | - public final void write(final T state, final Path... locations) throws IOException { |
| 213 | + |
| 214 | + public final void write(final T state, final Path... locations) throws WriteStateException { |
158 | 215 | if (locations == null) { |
159 | 216 | throw new IllegalArgumentException("Locations must not be null"); |
160 | 217 | } |
161 | 218 | if (locations.length <= 0) { |
162 | 219 | throw new IllegalArgumentException("One or more locations required"); |
163 | 220 | } |
164 | | - final long maxStateId = findMaxStateId(prefix, locations) + 1; |
| 221 | + |
| 222 | + long maxStateId; |
| 223 | + try { |
| 224 | + maxStateId = findMaxStateId(prefix, locations) + 1; |
| 225 | + } catch (Exception e) { |
| 226 | + throw new WriteStateException(false, "exception during looking up max state id", e); |
| 227 | + } |
165 | 228 | assert maxStateId >= 0 : "maxStateId must be positive but was: [" + maxStateId + "]"; |
166 | 229 |
|
167 | 230 | final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION; |
168 | 231 | final String tmpFileName = fileName + ".tmp"; |
169 | 232 | final Path firstStateLocation = locations[0].resolve(STATE_DIR_NAME); |
170 | | - try (Directory stateDir = newDirectory(firstStateLocation)) { |
171 | | - writeStateToFirstLocation(state, firstStateLocation, stateDir, fileName, tmpFileName); |
| 233 | + List<Tuple<Path, Directory>> directories = new ArrayList<>(); |
172 | 234 |
|
| 235 | + try { |
| 236 | + Directory firstStateDir = writeStateToFirstLocation(state, firstStateLocation, tmpFileName); |
| 237 | + directories.add(new Tuple<>(firstStateLocation, firstStateDir)); |
173 | 238 | for (int i = 1; i < locations.length; i++) { |
174 | 239 | final Path extraStateLocation = locations[i].resolve(STATE_DIR_NAME); |
175 | | - copyStateToExtraLocation(stateDir, extraStateLocation, fileName, tmpFileName); |
| 240 | + Directory extraStateDir = copyStateToExtraLocation(firstStateDir, extraStateLocation, tmpFileName); |
| 241 | + directories.add(new Tuple<>(extraStateLocation, extraStateDir)); |
| 242 | + } |
| 243 | + performRenames(tmpFileName, fileName, directories); |
| 244 | + performStateDirectoriesFsync(directories); |
| 245 | + } finally { |
| 246 | + //writeStateToFirstLocation and copyStateToExtraLocation perform clean up for themselves if they fail |
| 247 | + //we need to perform clean up for all data paths that were successfully opened and temporary file was created |
| 248 | + for (int i = 0; i < directories.size(); i++) { |
| 249 | + Tuple<Path, Directory> pathAndDirectory = directories.get(i); |
| 250 | + performDirectoryCleanup(pathAndDirectory.v1(), pathAndDirectory.v2(), tmpFileName); |
176 | 251 | } |
177 | 252 | } |
178 | 253 |
|
@@ -229,16 +304,18 @@ protected Directory newDirectory(Path dir) throws IOException { |
229 | 304 | return new SimpleFSDirectory(dir); |
230 | 305 | } |
231 | 306 |
|
232 | | - private void cleanupOldFiles(final String currentStateFile, Path[] locations) throws IOException { |
| 307 | + private void cleanupOldFiles(final String currentStateFile, Path[] locations) { |
233 | 308 | for (Path location : locations) { |
234 | 309 | logger.trace("cleanupOldFiles: cleaning up {}", location); |
235 | 310 | Path stateLocation = location.resolve(STATE_DIR_NAME); |
236 | 311 | try (Directory stateDir = newDirectory(stateLocation)) { |
237 | 312 | for (String file : stateDir.listAll()) { |
238 | 313 | if (file.startsWith(prefix) && file.equals(currentStateFile) == false) { |
239 | | - deleteFileIfExists(stateLocation, stateDir, file); |
| 314 | + deleteFileIgnoreExceptions(stateLocation, stateDir, file); |
240 | 315 | } |
241 | 316 | } |
| 317 | + } catch (Exception e) { |
| 318 | + logger.trace("clean up failed for state location {}", stateLocation); |
242 | 319 | } |
243 | 320 | } |
244 | 321 | } |
|
0 commit comments