Skip to content

Commit df7b339

Browse files
committed
CheckpointFileManager
1 parent bd201bf commit df7b339

File tree

8 files changed

+697
-376
lines changed

8 files changed

+697
-376
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,13 @@ object SQLConf {
913913
.intConf
914914
.createWithDefault(100)
915915

916+
val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS =
917+
buildConf("spark.sql.streaming.checkpointFileManagerClass")
918+
.doc("The class used to write checkpoint files atomically. This class must be a subclass " +
919+
"of the interface CheckpointFileManager.")
920+
.internal()
921+
.stringConf
922+
916923
val NDV_MAX_ERROR =
917924
buildConf("spark.sql.statistics.ndv.maxError")
918925
.internal()
Lines changed: 374 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.streaming
18+
19+
import java.io.{FileSystem => _, _}
20+
import java.util.{EnumSet, UUID}
21+
22+
import scala.util.control.NonFatal
23+
24+
import org.apache.commons.io.IOUtils
25+
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.fs._
27+
import org.apache.hadoop.fs.local.{LocalFs, RawLocalFs}
28+
import org.apache.hadoop.fs.permission.FsPermission
29+
30+
import org.apache.spark.internal.Logging
31+
import org.apache.spark.sql.internal.SQLConf
32+
import org.apache.spark.util.Utils
33+
34+
/**
35+
* An interface to abstract out all operation related to streaming checkpoints. Most importantly,
36+
* the key operation this interface provides is `createAtomic(path, overwrite)` which returns a
37+
* `CancellableFSDataOutputStream`. This method is used by [[HDFSMetadataLog]] and
38+
* [[org.apache.spark.sql.execution.streaming.state.StateStore StateStore]] implementations
39+
* to write a complete checkpoint file atomically (i.e. no partial file will be visible), with or
40+
* without overwrite.
41+
*
42+
* This higher-level interface above the Hadoop FileSystem is necessary because
43+
* different implementation of FileSystem/FileContext may have different combination of operations
44+
* to provide the desired atomic guarantees (e.g. write-to-temp-file-and-rename,
45+
* direct-write-and-cancel-on-failure) and this abstraction allow different implementations while
46+
* keeping the usage simple (`createAtomic` -> `close` or `cancel`).
47+
*/
48+
trait CheckpointFileManager {
49+
50+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager._
51+
52+
/** List the files in a path that match a filter. */
53+
def list(path: Path, filter: PathFilter): Array[FileStatus]
54+
55+
/** List all the files in a path. */
56+
def list(path: Path): Array[FileStatus] = {
57+
list(path, new PathFilter { override def accept(path: Path): Boolean = true })
58+
}
59+
60+
/** Make directory at the give path and all its parent directories as needed. */
61+
def mkdirs(path: Path): Unit
62+
63+
/** Whether path exists */
64+
def exists(path: Path): Boolean
65+
66+
/** Create a file. */
67+
def create(path: Path, overwrite: Boolean): FSDataOutputStream
68+
69+
/** Create a file and make its contents available atomically after the output stream is closed. */
70+
def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream
71+
72+
/** Open a file for reading, or throw exception if it does not exist. */
73+
def open(path: Path): FSDataInputStream
74+
75+
/** Rename a file. */
76+
def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit
77+
78+
/** Recursively delete a path if it exists. Should not throw exception if file doesn't exist. */
79+
def delete(path: Path): Unit
80+
81+
/** Copy a local file to a remote file. */
82+
def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit
83+
84+
/** Copy a remote file to the local file. */
85+
def copyToLocalFile(srcPath: Path, localDestFile: File): Unit
86+
87+
/** Is the default file system this implementation is operating on the local file system. */
88+
def isLocal: Boolean
89+
}
90+
91+
object CheckpointFileManager extends Logging {
92+
93+
/**
94+
* An interface to add the cancel() operation to [[FSDataOutputStream]]. This is used
95+
* mainly by `CheckpointFileManager.createAtomic` to write a file atomically.
96+
*
97+
* @see [[CheckpointFileManager]].
98+
*/
99+
abstract class CancellableFSDataOutputStream(protected val underlyingStream: OutputStream)
100+
extends FSDataOutputStream(underlyingStream, null) {
101+
/** Cancel the `underlyingStream` and ensure that the output file is not generated. */
102+
def cancel(): Unit
103+
}
104+
105+
/**
106+
* An implementation of [[CancellableFSDataOutputStream]] that writes a file atomically by writing
107+
* to a temporary file and then renames.
108+
*/
109+
sealed class RenameBasedFSDataOutputStream(
110+
fm: CheckpointFileManager,
111+
finalPath: Path,
112+
tempPath: Path,
113+
overwrite: Boolean)
114+
extends CancellableFSDataOutputStream(fm.create(tempPath, overwrite)) {
115+
116+
def this(fm: CheckpointFileManager, path: Path, overwrite: Boolean) = {
117+
this(fm, path, generateTempPath(path), overwrite)
118+
}
119+
120+
logInfo(s"Writing atomically to $finalPath using temp file $tempPath")
121+
@volatile private var terminated = false
122+
123+
override def close(): Unit = synchronized {
124+
try {
125+
if (terminated) return
126+
super.close()
127+
fm.rename(tempPath, finalPath, overwrite)
128+
logInfo(s"Renamed temp file $tempPath to $finalPath")
129+
} finally {
130+
terminated = true
131+
}
132+
}
133+
134+
override def cancel(): Unit = synchronized {
135+
try {
136+
if (terminated) return
137+
underlyingStream.close()
138+
fm.delete(tempPath)
139+
} catch {
140+
case NonFatal(e) =>
141+
logWarning(s"Error cancelling write to $finalPath", e)
142+
} finally {
143+
terminated = true
144+
}
145+
}
146+
}
147+
148+
149+
/** Create an instance of [[CheckpointFileManager]] based on the path and configuration. */
150+
def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = {
151+
val fileManagerClass = hadoopConf.get(
152+
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
153+
if (fileManagerClass != null) {
154+
return Utils.classForName(fileManagerClass)
155+
.getConstructor(classOf[Path], classOf[Configuration])
156+
.newInstance(path, hadoopConf)
157+
.asInstanceOf[CheckpointFileManager]
158+
}
159+
try {
160+
// Try to create a manager based on `FileContext` because HDFS's `FileContext.rename()
161+
// gives atomic renames, which is what we rely on for the default implementation
162+
// `CheckpointFileManager.createAtomic`.
163+
new FileContextBasedCheckpointFileManager(path, hadoopConf)
164+
} catch {
165+
case e: UnsupportedFileSystemException =>
166+
logWarning(
167+
"Could not use FileContext API for managing metadata log files at path " +
168+
s"$path. Using FileSystem API instead for managing log files. The log may be " +
169+
s"inconsistent under failures.")
170+
new FileSystemBasedCheckpointFileManager(path, hadoopConf)
171+
}
172+
new FileSystemBasedCheckpointFileManager(path, hadoopConf)
173+
}
174+
175+
private def generateTempPath(path: Path): Path = {
176+
val tc = org.apache.spark.TaskContext.get
177+
val tid = if (tc != null) ".TID" + tc.taskAttemptId else ""
178+
new Path(path.getParent, s".${path.getName}.${UUID.randomUUID}${tid}.tmp")
179+
}
180+
}
181+
182+
183+
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileSystem]] API. */
184+
class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
185+
extends CheckpointFileManager with Logging {
186+
187+
import CheckpointFileManager._
188+
189+
protected val fs = path.getFileSystem(hadoopConf)
190+
191+
fs.setVerifyChecksum(false)
192+
fs.setWriteChecksum(false)
193+
194+
override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
195+
fs.listStatus(path, filter)
196+
}
197+
198+
override def mkdirs(path: Path): Unit = {
199+
fs.mkdirs(path, FsPermission.getDirDefault)
200+
}
201+
202+
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = {
203+
fs.create(path, overwrite)
204+
}
205+
206+
override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = {
207+
new RenameBasedFSDataOutputStream(this, path, overwrite)
208+
}
209+
210+
override def open(path: Path): FSDataInputStream = {
211+
fs.open(path)
212+
}
213+
214+
override def exists(path: Path): Boolean = {
215+
fs.exists(path)
216+
}
217+
218+
override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = {
219+
if (!overwrite && fs.exists(dstPath)) {
220+
throw new FileAlreadyExistsException(
221+
s"Failed to rename $srcPath to $dstPath as destination already exists")
222+
}
223+
224+
def deleteAndRename(prevException: Exception): Unit = {
225+
if (overwrite) {
226+
try {
227+
if (fs.delete(dstPath, true)) {
228+
logWarning(s"Failed to delete $dstPath before second attempt to rename")
229+
}
230+
if (!fs.rename(srcPath, dstPath)) {
231+
val msg = s"Failed to rename temp file $srcPath to $dstPath as second attempt to" +
232+
s"rename (after delete) returned false"
233+
logWarning(msg)
234+
val e = new IOException(msg)
235+
e.addSuppressed(prevException)
236+
throw e
237+
}
238+
} catch {
239+
case NonFatal(e) =>
240+
logError(s"Failed to write atomically to $dstPath", e)
241+
if (prevException != null) e.addSuppressed(prevException)
242+
throw e
243+
}
244+
} else {
245+
throw prevException
246+
}
247+
}
248+
249+
try {
250+
if (!fs.rename(srcPath, dstPath)) {
251+
val msg = s"Failed to rename temp file $srcPath to $dstPath as rename returned false"
252+
logWarning(msg)
253+
deleteAndRename(new IOException(msg))
254+
}
255+
} catch {
256+
case fe: FileAlreadyExistsException =>
257+
logWarning(s"Failed to rename temp file $srcPath to $dstPath because file exists", fe)
258+
deleteAndRename(fe)
259+
}
260+
}
261+
262+
override def delete(path: Path): Unit = {
263+
try {
264+
fs.delete(path, true)
265+
} catch {
266+
case e: FileNotFoundException =>
267+
logInfo(s"Failed to delete $path as it does not exist")
268+
// ignore if file has already been deleted
269+
}
270+
}
271+
272+
override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = {
273+
fs.copyFromLocalFile(new Path(localSrcFile.getAbsoluteFile.toURI), destPath)
274+
}
275+
276+
override def copyToLocalFile(srcPath: Path, localDestFile: File): Unit = {
277+
fs.copyToLocalFile(srcPath, new Path(localDestFile.getAbsoluteFile.toURI))
278+
}
279+
280+
override def isLocal: Boolean = fs match {
281+
case _: LocalFileSystem | _: RawLocalFileSystem => true
282+
case _ => false
283+
}
284+
}
285+
286+
287+
/** An implementation of [[CheckpointFileManager]] using Hadoop's [[FileContext]] API. */
288+
class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
289+
extends CheckpointFileManager with Logging {
290+
291+
import CheckpointFileManager._
292+
293+
private val fc = if (path.toUri.getScheme == null) {
294+
FileContext.getFileContext(hadoopConf)
295+
} else {
296+
FileContext.getFileContext(path.toUri, hadoopConf)
297+
}
298+
299+
override def list(path: Path, filter: PathFilter): Array[FileStatus] = {
300+
fc.util.listStatus(path, filter)
301+
}
302+
303+
override def mkdirs(path: Path): Unit = {
304+
fc.mkdir(path, FsPermission.getDirDefault, true)
305+
}
306+
307+
override def create(path: Path, overwrite: Boolean): FSDataOutputStream = {
308+
import CreateFlag._
309+
val flags = if (overwrite) EnumSet.of(CREATE, OVERWRITE) else EnumSet.of(CREATE)
310+
fc.create(path, flags)
311+
}
312+
313+
override def createAtomic(path: Path, overwrite: Boolean): CancellableFSDataOutputStream = {
314+
new RenameBasedFSDataOutputStream(this, path, overwrite)
315+
}
316+
317+
override def open(path: Path): FSDataInputStream = {
318+
fc.open(path)
319+
}
320+
321+
override def exists(path: Path): Boolean = {
322+
fc.util.exists(path)
323+
}
324+
325+
override def rename(srcPath: Path, dstPath: Path, overwrite: Boolean): Unit = {
326+
import Options.Rename._
327+
fc.rename(srcPath, dstPath, if (overwrite) OVERWRITE else NONE)
328+
}
329+
330+
331+
override def delete(path: Path): Unit = {
332+
try {
333+
fc.delete(path, true)
334+
} catch {
335+
case e: FileNotFoundException =>
336+
// ignore if file has already been deleted
337+
}
338+
}
339+
340+
override def copyFromLocalFile(localSrcFile: File, destPath: Path): Unit = {
341+
val localFc = FileContext.getLocalFSFileContext
342+
var in: InputStream = null
343+
var out: OutputStream = null
344+
try {
345+
in = localFc.open(new Path(localSrcFile.getAbsoluteFile.toURI))
346+
out = fc.create(destPath, EnumSet.of(CreateFlag.CREATE))
347+
IOUtils.copyLarge(in, out)
348+
} finally {
349+
if (in != null) in.close()
350+
if (out != null) out.close()
351+
}
352+
}
353+
354+
override def copyToLocalFile(srcPath: Path, localDstFile: File): Unit = {
355+
val localFc = FileContext.getLocalFSFileContext
356+
var in: InputStream = null
357+
var out: OutputStream = null
358+
try {
359+
in = fc.open(srcPath)
360+
out = localFc.create(
361+
new Path(localDstFile.getAbsoluteFile.toURI), EnumSet.of(CreateFlag.CREATE))
362+
IOUtils.copyLarge(in, out)
363+
} finally {
364+
if (in != null) in.close()
365+
if (out != null) out.close()
366+
}
367+
}
368+
369+
override def isLocal: Boolean = fc.getDefaultFileSystem match {
370+
case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs
371+
case _ => false
372+
}
373+
}
374+

0 commit comments

Comments
 (0)