@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
2828import org .apache .hadoop .fs .permission .FsPermission
2929
3030import org .apache .spark .internal .Logging
31+ import org .apache .spark .sql .execution .streaming .CheckpointFileManager .RenameHelperMethods
3132import org .apache .spark .sql .internal .SQLConf
3233import org .apache .spark .util .Utils
3334
@@ -49,6 +50,21 @@ trait CheckpointFileManager {
4950
5051 import org .apache .spark .sql .execution .streaming .CheckpointFileManager ._
5152
53+ /**
54+ * Create a file and make its contents available atomically after the output stream is closed.
55+ *
56+ * @param path Path to create
57+ * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
58+ * overwrite the file if it already exists. It should not throw
59+ * any exception if the file exists. However, if false, then the
60+ * implementation must not overwrite if the file alraedy exists and
61+ * must throw `FileAlreadyExistsException` in that case.
62+ */
63+ def createAtomic (path : Path , overwriteIfPossible : Boolean ): CancellableFSDataOutputStream
64+
65+ /** Open a file for reading, or throw exception if it does not exist. */
66+ def open (path : Path ): FSDataInputStream
67+
5268 /** List the files in a path that match a filter. */
5369 def list (path : Path , filter : PathFilter ): Array [FileStatus ]
5470
@@ -63,33 +79,37 @@ trait CheckpointFileManager {
6379 /** Whether path exists */
6480 def exists (path : Path ): Boolean
6581
66- /** Create a file. */
67- def create (path : Path , overwrite : Boolean ): FSDataOutputStream
68-
69- /** Create a file and make its contents available atomically after the output stream is closed. */
70- def createAtomic (path : Path , overwrite : Boolean ): CancellableFSDataOutputStream
71-
72- /** Open a file for reading, or throw exception if it does not exist. */
73- def open (path : Path ): FSDataInputStream
74-
75- /** Rename a file. */
76- def rename (srcPath : Path , dstPath : Path , overwrite : Boolean ): Unit
77-
7882 /** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
7983 def delete (path : Path ): Unit
8084
81- /** Copy a local file to a remote file. */
82- def copyFromLocalFile (localSrcFile : File , destPath : Path ): Unit
83-
84- /** Copy a remote file to the local file. */
85- def copyToLocalFile (srcPath : Path , localDestFile : File ): Unit
86-
8785 /** Is the default file system this implementation is operating on the local file system. */
8886 def isLocal : Boolean
8987}
9088
9189object CheckpointFileManager extends Logging {
9290
91+ /**
92+ * Additional methods in CheckpointFileManager implementations that allows
93+ * [[RenameBasedFSDataOutputStream ]] get atomicity by write-to-temp-file-and-rename
94+ */
95+ sealed trait RenameHelperMethods { self => CheckpointFileManager
96+ /** Create a file with overwrite. */
97+ def create (path : Path ): FSDataOutputStream
98+
99+ /**
100+ * Rename a file.
101+ *
102+ * @param srcPath Source path to rename
103+ * @param dstPath Destination path to rename to
104+ * @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
105+ * overwrite the file if it already exists. It should not throw
106+ * any exception if the file exists. However, if false, then the
107+ * implementation must not overwrite if the file alraedy exists and
108+ * must throw `FileAlreadyExistsException` in that case.
109+ */
110+ def rename (srcPath : Path , dstPath : Path , overwriteIfPossible : Boolean ): Unit
111+ }
112+
93113 /**
94114 * An interface to add the cancel() operation to [[FSDataOutputStream ]]. This is used
95115 * mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
@@ -107,13 +127,13 @@ object CheckpointFileManager extends Logging {
107127 * to a temporary file and then renames.
108128 */
109129 sealed class RenameBasedFSDataOutputStream (
110- fm : CheckpointFileManager ,
130+ fm : CheckpointFileManager with RenameHelperMethods ,
111131 finalPath : Path ,
112132 tempPath : Path ,
113- overwrite : Boolean )
114- extends CancellableFSDataOutputStream (fm.create(tempPath, overwrite )) {
133+ overwriteIfPossible : Boolean )
134+ extends CancellableFSDataOutputStream (fm.create(tempPath)) {
115135
116- def this (fm : CheckpointFileManager , path : Path , overwrite : Boolean ) = {
136+ def this (fm : CheckpointFileManager with RenameHelperMethods , path : Path , overwrite : Boolean ) = {
117137 this (fm, path, generateTempPath(path), overwrite)
118138 }
119139
@@ -124,7 +144,7 @@ object CheckpointFileManager extends Logging {
124144 try {
125145 if (terminated) return
126146 super .close()
127- fm.rename(tempPath, finalPath, overwrite )
147+ fm.rename(tempPath, finalPath, overwriteIfPossible )
128148 logInfo(s " Renamed temp file $tempPath to $finalPath" )
129149 } finally {
130150 terminated = true
@@ -164,12 +184,12 @@ object CheckpointFileManager extends Logging {
164184 } catch {
165185 case e : UnsupportedFileSystemException =>
166186 logWarning(
167- " Could not use FileContext API for managing metadata log files at path " +
168- s " $path. Using FileSystem API instead for managing log files. The log may be " +
169- s " inconsistent under failures. " )
187+ " Could not use FileContext API for managing Structured Streaming checkpoint files at " +
188+ s " $path. Using FileSystem API instead for managing log files. If the implementation " +
189+ s " of FileSystem.rename() is not atomic, then the correctness and fault-tolerance of " +
190+ s " your Structured Streaming is not guaranteed. " )
170191 new FileSystemBasedCheckpointFileManager (path, hadoopConf)
171192 }
172- new FileSystemBasedCheckpointFileManager (path, hadoopConf)
173193 }
174194
175195 private def generateTempPath (path : Path ): Path = {
@@ -182,7 +202,7 @@ object CheckpointFileManager extends Logging {
182202
183203/** An implementation of [[CheckpointFileManager ]] using Hadoop's [[FileSystem ]] API. */
184204class FileSystemBasedCheckpointFileManager (path : Path , hadoopConf : Configuration )
185- extends CheckpointFileManager with Logging {
205+ extends CheckpointFileManager with RenameHelperMethods with Logging {
186206
187207 import CheckpointFileManager ._
188208
@@ -199,12 +219,13 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
199219 fs.mkdirs(path, FsPermission .getDirDefault)
200220 }
201221
202- override def create (path : Path , overwrite : Boolean ): FSDataOutputStream = {
203- fs.create(path, overwrite )
222+ override def create (path : Path ): FSDataOutputStream = {
223+ fs.create(path, true )
204224 }
205225
206- override def createAtomic (path : Path , overwrite : Boolean ): CancellableFSDataOutputStream = {
207- new RenameBasedFSDataOutputStream (this , path, overwrite)
226+ override def createAtomic (
227+ path : Path , overwriteIfPossible : Boolean ): CancellableFSDataOutputStream = {
228+ new RenameBasedFSDataOutputStream (this , path, overwriteIfPossible)
208229 }
209230
210231 override def open (path : Path ): FSDataInputStream = {
@@ -215,47 +236,33 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
215236 fs.exists(path)
216237 }
217238
218- override def rename (srcPath : Path , dstPath : Path , overwrite : Boolean ): Unit = {
219- if (! overwrite && fs.exists(dstPath)) {
239+ override def rename (srcPath : Path , dstPath : Path , overwriteIfPossible : Boolean ): Unit = {
240+ if (! overwriteIfPossible && fs.exists(dstPath)) {
220241 throw new FileAlreadyExistsException (
221242 s " Failed to rename $srcPath to $dstPath as destination already exists " )
222243 }
223244
224- def deleteAndRename (prevException : Exception ): Unit = {
225- if (overwrite) {
226- try {
227- if (fs.delete(dstPath, true )) {
228- logWarning(s " Failed to delete $dstPath before second attempt to rename " )
229- }
230- if (! fs.rename(srcPath, dstPath)) {
231- val msg = s " Failed to rename temp file $srcPath to $dstPath as second attempt to " +
232- s " rename (after delete) returned false "
233- logWarning(msg)
234- val e = new IOException (msg)
235- e.addSuppressed(prevException)
236- throw e
237- }
238- } catch {
239- case NonFatal (e) =>
240- logError(s " Failed to write atomically to $dstPath" , e)
241- if (prevException != null ) e.addSuppressed(prevException)
242- throw e
243- }
244- } else {
245- throw prevException
246- }
247- }
248-
249245 try {
250246 if (! fs.rename(srcPath, dstPath)) {
251- val msg = s " Failed to rename temp file $srcPath to $dstPath as rename returned false "
252- logWarning(msg)
253- deleteAndRename(new IOException (msg))
247+ if (fs.exists(dstPath) && ! overwriteIfPossible) {
248+ // Some implementations of FileSystem may not throw FileAlreadyExistsException but
249+ // only return false if file already exists. Explicitly throw the error.
250+ // Note that this is definitely not atomic, so this is only a best-effort attempt
251+ // to throw the most appropriate exception when rename returned false.
252+ throw new FileAlreadyExistsException (s " $dstPath already exists " )
253+ } else {
254+ val msg = s " Failed to rename temp file $srcPath to $dstPath as rename returned false "
255+ logWarning(msg)
256+ throw new IOException (msg)
257+ }
254258 }
255259 } catch {
256260 case fe : FileAlreadyExistsException =>
261+ // Some implementation of FileSystem can directly throw FileAlreadyExistsException if file
262+ // already exists. Ignore the error if overwriteIfPossible = true as it is expected to be
263+ // best effort.
257264 logWarning(s " Failed to rename temp file $srcPath to $dstPath because file exists " , fe)
258- deleteAndRename(fe)
265+ if ( ! overwriteIfPossible) throw fe
259266 }
260267 }
261268
@@ -269,14 +276,6 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
269276 }
270277 }
271278
272- override def copyFromLocalFile (localSrcFile : File , destPath : Path ): Unit = {
273- fs.copyFromLocalFile(new Path (localSrcFile.getAbsoluteFile.toURI), destPath)
274- }
275-
276- override def copyToLocalFile (srcPath : Path , localDestFile : File ): Unit = {
277- fs.copyToLocalFile(srcPath, new Path (localDestFile.getAbsoluteFile.toURI))
278- }
279-
280279 override def isLocal : Boolean = fs match {
281280 case _ : LocalFileSystem | _ : RawLocalFileSystem => true
282281 case _ => false
@@ -286,7 +285,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
286285
287286/** An implementation of [[CheckpointFileManager ]] using Hadoop's [[FileContext ]] API. */
288287class FileContextBasedCheckpointFileManager (path : Path , hadoopConf : Configuration )
289- extends CheckpointFileManager with Logging {
288+ extends CheckpointFileManager with RenameHelperMethods with Logging {
290289
291290 import CheckpointFileManager ._
292291
@@ -304,14 +303,14 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
304303 fc.mkdir(path, FsPermission .getDirDefault, true )
305304 }
306305
307- override def create (path : Path , overwrite : Boolean ): FSDataOutputStream = {
306+ override def create (path : Path ): FSDataOutputStream = {
308307 import CreateFlag ._
309- val flags = if (overwrite) EnumSet .of(CREATE , OVERWRITE ) else EnumSet .of(CREATE )
310- fc.create(path, flags)
308+ fc.create(path, EnumSet .of(CREATE , OVERWRITE ))
311309 }
312310
313- override def createAtomic (path : Path , overwrite : Boolean ): CancellableFSDataOutputStream = {
314- new RenameBasedFSDataOutputStream (this , path, overwrite)
311+ override def createAtomic (
312+ path : Path , overwriteIfPossible : Boolean ): CancellableFSDataOutputStream = {
313+ new RenameBasedFSDataOutputStream (this , path, overwriteIfPossible)
315314 }
316315
317316 override def open (path : Path ): FSDataInputStream = {
@@ -322,9 +321,9 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
322321 fc.util.exists(path)
323322 }
324323
325- override def rename (srcPath : Path , dstPath : Path , overwrite : Boolean ): Unit = {
324+ override def rename (srcPath : Path , dstPath : Path , overwriteIfPossible : Boolean ): Unit = {
326325 import Options .Rename ._
327- fc.rename(srcPath, dstPath, if (overwrite ) OVERWRITE else NONE )
326+ fc.rename(srcPath, dstPath, if (overwriteIfPossible ) OVERWRITE else NONE )
328327 }
329328
330329
@@ -337,35 +336,6 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
337336 }
338337 }
339338
340- override def copyFromLocalFile (localSrcFile : File , destPath : Path ): Unit = {
341- val localFc = FileContext .getLocalFSFileContext
342- var in : InputStream = null
343- var out : OutputStream = null
344- try {
345- in = localFc.open(new Path (localSrcFile.getAbsoluteFile.toURI))
346- out = fc.create(destPath, EnumSet .of(CreateFlag .CREATE ))
347- IOUtils .copyLarge(in, out)
348- } finally {
349- if (in != null ) in.close()
350- if (out != null ) out.close()
351- }
352- }
353-
354- override def copyToLocalFile (srcPath : Path , localDstFile : File ): Unit = {
355- val localFc = FileContext .getLocalFSFileContext
356- var in : InputStream = null
357- var out : OutputStream = null
358- try {
359- in = fc.open(srcPath)
360- out = localFc.create(
361- new Path (localDstFile.getAbsoluteFile.toURI), EnumSet .of(CreateFlag .CREATE ))
362- IOUtils .copyLarge(in, out)
363- } finally {
364- if (in != null ) in.close()
365- if (out != null ) out.close()
366- }
367- }
368-
369339 override def isLocal : Boolean = fc.getDefaultFileSystem match {
370340 case _ : LocalFs | _ : RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
371341 case _ => false
0 commit comments