Skip to content

Commit c6d3f37

Browse files
committed
[SPARK-35240][SS] Use CheckpointFileManager for checkpoint file manipulation
### What changes were proposed in this pull request? This patch changes a few places using `FileSystem` API to manipulate checkpoint file to `CheckpointFileManager`. ### Why are the changes needed? `CheckpointFileManager` is designed to handle checkpoint file manipulation. However, there are a few places exposing `FileSystem` from checkpoint files/paths. We should use `CheckpointFileManager` to manipulate checkpoint files. For example, we may want to have one storage system for checkpoint file. If all checkpoint file manipulation is performed through `CheckpointFileManager`, we can only implement `CheckpointFileManager` for the storage system, and don't need to implement `FileSystem` API for it. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #32361 from viirya/checkpoint-manager. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
1 parent 3f5a209 commit c6d3f37

File tree

4 files changed

+32
-11
lines changed

4 files changed

+32
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ trait CheckpointFileManager {
8383

8484
/** Is the default file system this implementation is operating on the local file system. */
8585
def isLocal: Boolean
86+
87+
/**
88+
* Creates the checkpoint path if it does not exist, and returns the qualified
89+
* checkpoint path.
90+
*/
91+
def createCheckpointDirectory(): Path
8692
}
8793

8894
object CheckpointFileManager extends Logging {
@@ -285,6 +291,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
285291
case _: LocalFileSystem | _: RawLocalFileSystem => true
286292
case _ => false
287293
}
294+
295+
override def createCheckpointDirectory(): Path = {
296+
val qualifiedPath = fs.makeQualified(path)
297+
fs.mkdirs(qualifiedPath, FsPermission.getDirDefault)
298+
qualifiedPath
299+
}
288300
}
289301

290302

@@ -351,6 +363,12 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio
351363
case _ => false
352364
}
353365

366+
override def createCheckpointDirectory(): Path = {
367+
val qualifiedPath = fc.makeQualified(path)
368+
fc.mkdir(qualifiedPath, FsPermission.getDirDefault, true)
369+
qualifiedPath
370+
}
371+
354372
private def mayRemoveCrcFile(path: Path): Unit = {
355373
try {
356374
val checksumFile = new Path(path.getParent, s".${path.getName}.crc")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,12 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
8989
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
9090
}
9191
}
92+
val fileManager = CheckpointFileManager.create(new Path(checkpointLocation), s.hadoopConf)
93+
9294
// If offsets have already been created, we trying to resume a query.
9395
if (!s.recoverFromCheckpointLocation) {
9496
val checkpointPath = new Path(checkpointLocation, "offsets")
95-
val fs = checkpointPath.getFileSystem(s.hadoopConf)
96-
if (fs.exists(checkpointPath)) {
97+
if (fileManager.exists(checkpointPath)) {
9798
throw new AnalysisException(
9899
s"This query does not support recovering from checkpoint location. " +
99100
s"Delete $checkpointPath to start over.")
@@ -102,7 +103,6 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
102103

103104
val resolvedCheckpointRoot = {
104105
val checkpointPath = new Path(checkpointLocation)
105-
val fs = checkpointPath.getFileSystem(s.hadoopConf)
106106
if (conf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
107107
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
108108
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
@@ -112,7 +112,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
112112
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
113113
val legacyCheckpointDirExists =
114114
try {
115-
fs.exists(new Path(legacyCheckpointDir))
115+
fileManager.exists(new Path(legacyCheckpointDir))
116116
} catch {
117117
case NonFatal(e) =>
118118
// We may not have access to this directory. Don't fail the query if that happens.
@@ -139,8 +139,7 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
139139
.stripMargin)
140140
}
141141
}
142-
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
143-
fs.mkdirs(checkpointDir)
142+
val checkpointDir = fileManager.createCheckpointDirectory()
144143
checkpointDir.toString
145144
}
146145
logInfo(s"Checkpoint root $checkpointLocation resolved to $resolvedCheckpointRoot.")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,10 @@ abstract class StreamExecution(
237237
protected def checkpointFile(name: String): String =
238238
new Path(new Path(resolvedCheckpointRoot), name).toString
239239

240+
/** All checkpoint file operations should be performed through `CheckpointFileManager`. */
241+
private val fileManager = CheckpointFileManager.create(new Path(resolvedCheckpointRoot),
242+
sparkSession.sessionState.newHadoopConf)
243+
240244
/**
241245
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
242246
* has been posted to all the listeners.
@@ -355,8 +359,7 @@ abstract class StreamExecution(
355359
val checkpointPath = new Path(resolvedCheckpointRoot)
356360
try {
357361
logInfo(s"Deleting checkpoint $checkpointPath.")
358-
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
359-
fs.delete(checkpointPath, true)
362+
fileManager.delete(checkpointPath)
360363
} catch {
361364
case NonFatal(e) =>
362365
// Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ object StreamMetadata extends Logging {
4949

5050
/** Read the metadata from file if it exists */
5151
def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
52-
val fs = metadataFile.getFileSystem(hadoopConf)
53-
if (fs.exists(metadataFile)) {
52+
val fileManager = CheckpointFileManager.create(metadataFile.getParent, hadoopConf)
53+
54+
if (fileManager.exists(metadataFile)) {
5455
var input: FSDataInputStream = null
5556
try {
56-
input = fs.open(metadataFile)
57+
input = fileManager.open(metadataFile)
5758
val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
5859
val metadata = Serialization.read[StreamMetadata](reader)
5960
Some(metadata)

0 commit comments

Comments
 (0)