@@ -105,61 +105,49 @@ private static void performDirectoryCleanup(Path stateLocation, Directory stateD
105105 IOUtils .closeWhileHandlingException (stateDir );
106106 }
107107
108- private Directory writeStateToFirstLocation (final T state , Path stateLocation , String tmpFileName )
108+ private void writeStateToFirstLocation (final T state , Path stateLocation , Directory stateDir , String tmpFileName )
109109 throws WriteStateException {
110110 try {
111- Directory stateDir = newDirectory (stateLocation );
112- try {
113- deleteFileIfExists (stateLocation , stateDir , tmpFileName );
114- try (IndexOutput out = stateDir .createOutput (tmpFileName , IOContext .DEFAULT )) {
115- CodecUtil .writeHeader (out , STATE_FILE_CODEC , STATE_FILE_VERSION );
116- out .writeInt (FORMAT .index ());
117- try (XContentBuilder builder = newXContentBuilder (FORMAT , new IndexOutputOutputStream (out ) {
118- @ Override
119- public void close () {
120- // this is important since some of the XContentBuilders write bytes on close.
121- // in order to write the footer we need to prevent closing the actual index input.
122- }
123- })) {
124-
125- builder .startObject ();
126- toXContent (builder , state );
127- builder .endObject ();
111+ deleteFileIfExists (stateLocation , stateDir , tmpFileName );
112+ try (IndexOutput out = stateDir .createOutput (tmpFileName , IOContext .DEFAULT )) {
113+ CodecUtil .writeHeader (out , STATE_FILE_CODEC , STATE_FILE_VERSION );
114+ out .writeInt (FORMAT .index ());
115+ try (XContentBuilder builder = newXContentBuilder (FORMAT , new IndexOutputOutputStream (out ) {
116+ @ Override
117+ public void close () {
118+ // this is important since some of the XContentBuilders write bytes on close.
119+ // in order to write the footer we need to prevent closing the actual index input.
128120 }
129- CodecUtil .writeFooter (out );
130- }
121+ })) {
131122
132- stateDir .sync (Collections .singleton (tmpFileName ));
133- } catch (Exception e ) {
134- // perform clean up only in case of exception, we need to keep directory open and temporary file on disk
135- // if everything is ok for the next algorithm steps
136- performDirectoryCleanup (stateLocation , stateDir , tmpFileName );
137- throw e ;
123+ builder .startObject ();
124+ toXContent (builder , state );
125+ builder .endObject ();
126+ }
127+ CodecUtil .writeFooter (out );
138128 }
139- return stateDir ;
129+
130+ stateDir .sync (Collections .singleton (tmpFileName ));
140131 } catch (Exception e ) {
141132 throw new WriteStateException (false , "failed to write state to the first location tmp file " +
142133 stateLocation .resolve (tmpFileName ), e );
143134 }
144135 }
145136
146- private Directory copyStateToExtraLocation ( Directory srcStateDir , Path extraStateLocation , String tmpFileName )
137+ private static void copyStateToExtraLocations ( List < Tuple < Path , Directory >> stateDirs , String tmpFileName )
147138 throws WriteStateException {
148- try {
149- Directory extraStateDir = newDirectory (extraStateLocation );
139+ Directory srcStateDir = stateDirs .get (0 ).v2 ();
140+ for (int i = 1 ; i < stateDirs .size (); i ++) {
141+ Tuple <Path , Directory > extraStatePathAndDir = stateDirs .get (i );
142+ Path extraStateLocation = extraStatePathAndDir .v1 ();
143+ Directory extraStateDir = extraStatePathAndDir .v2 ();
150144 try {
151145 deleteFileIfExists (extraStateLocation , extraStateDir , tmpFileName );
152146 extraStateDir .copyFrom (srcStateDir , tmpFileName , tmpFileName , IOContext .DEFAULT );
153147 extraStateDir .sync (Collections .singleton (tmpFileName ));
154148 } catch (Exception e ) {
155- // perform clean up only in case of exception, we need to keep directory open and temporary file on disk
156- // if everything is ok for the next algorithm steps
157- performDirectoryCleanup (extraStateLocation , extraStateDir , tmpFileName );
158- throw e ;
149+ throw new WriteStateException (false , "failed to copy tmp state file to extra location " + extraStateLocation , e );
159150 }
160- return extraStateDir ;
161- } catch (Exception e ) {
162- throw new WriteStateException (false , "failed to copy tmp state file to extra location " + extraStateLocation , e );
163151 }
164152 }
165153
@@ -194,7 +182,6 @@ private static void performStateDirectoriesFsync(List<Tuple<Path, Directory>> st
194182 }
195183 }
196184
197-
198185 /**
199186 * Writes the given state to the given directories. The state is written to a
200187 * state directory ({@value #STATE_DIR_NAME}) underneath each of the given file locations and is created if it
@@ -226,24 +213,24 @@ public final void write(final T state, final Path... locations) throws WriteStat
226213
227214 final String fileName = prefix + maxStateId + STATE_FILE_EXTENSION ;
228215 final String tmpFileName = fileName + ".tmp" ;
229- final Path firstStateLocation = locations [0 ].resolve (STATE_DIR_NAME );
230216 List <Tuple <Path , Directory >> directories = new ArrayList <>();
231217
232218 try {
233- Directory firstStateDir = writeStateToFirstLocation (state , firstStateLocation , tmpFileName );
234- directories .add (new Tuple <>(firstStateLocation , firstStateDir ));
235- for (int i = 1 ; i < locations .length ; i ++) {
236- final Path extraStateLocation = locations [i ].resolve (STATE_DIR_NAME );
237- Directory extraStateDir = copyStateToExtraLocation (firstStateDir , extraStateLocation , tmpFileName );
238- directories .add (new Tuple <>(extraStateLocation , extraStateDir ));
219+ for (Path location : locations ) {
220+ Path stateLocation = location .resolve (STATE_DIR_NAME );
221+ try {
222+ directories .add (new Tuple <>(location , newDirectory (stateLocation )));
223+ } catch (IOException e ) {
224+ throw new WriteStateException (false , "failed to open state directory " + stateLocation , e );
225+ }
239226 }
227+
228+ writeStateToFirstLocation (state , directories .get (0 ).v1 (), directories .get (0 ).v2 (), tmpFileName );
229+ copyStateToExtraLocations (directories , tmpFileName );
240230 performRenames (tmpFileName , fileName , directories );
241231 performStateDirectoriesFsync (directories );
242232 } finally {
243- //writeStateToFirstLocation and copyStateToExtraLocation perform clean up for themselves if they fail
244- //we need to perform clean up for all data paths that were successfully opened and temporary file was created
245- for (int i = 0 ; i < directories .size (); i ++) {
246- Tuple <Path , Directory > pathAndDirectory = directories .get (i );
233+ for (Tuple <Path , Directory > pathAndDirectory : directories ) {
247234 performDirectoryCleanup (pathAndDirectory .v1 (), pathAndDirectory .v2 (), tmpFileName );
248235 }
249236 }
0 commit comments