Skip to content

Commit

Permalink
[SPARK-26389][SS] Add force delete temp checkpoint configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Feb 4, 2019
1 parent 196ca0c commit 7c304f6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,13 @@ object SQLConf {
.stringConf
.createOptional

val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION =
buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation")
.doc("Temporary checkpoint locations deleted normally when the query didn't fail." +
" When true, it will be deleted even if the query failed.")
.booleanConf
.createWithDefault(false)

val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
.internal()
.doc("The minimum number of batches that must be retained and made recoverable.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ case object RECONFIGURING extends State
* and the results are committed transactionally to the given [[Sink]].
*
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
* errors
* errors. Checkpoint deletion can be forced with the appropriate
* Spark configuration.
*/
abstract class StreamExecution(
override val sparkSession: SparkSession,
Expand Down Expand Up @@ -92,6 +93,7 @@ abstract class StreamExecution(
fs.mkdirs(checkpointPath)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")

def logicalPlan: LogicalPlan

Expand Down Expand Up @@ -335,10 +337,13 @@ abstract class StreamExecution(
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))

// Delete the temp checkpoint only when the query didn't fail
if (deleteCheckpointOnStop && exception.isEmpty) {
// Delete the temp checkpoint when either force delete enabled or the query didn't fail
if (deleteCheckpointOnStop &&
(sparkSession.sessionState.conf.getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) ||
exception.isEmpty)) {
val checkpointPath = new Path(resolvedCheckpointRoot)
try {
logInfo(s"Deleting checkpoint $checkpointPath.")
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
}

test("configured checkpoint dir should not be deleted if a query is stopped without errors and" +
" force temp checkpoint deletion enabled") {
import testImplicits._
withTempDir { checkpointPath =>
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath,
SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
val ds = MemoryStream[Int].toDS
val query = ds.writeStream.format("console").start()
assert(checkpointPath.exists())
query.stop()
assert(checkpointPath.exists())
}
}
}

test("temp checkpoint dir should be deleted if a query is stopped without errors") {
import testImplicits._
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
Expand All @@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}

testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
testTempCheckpointWithFailedQuery(false)
}

testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" +
" temp checkpoint deletion enabled") {
withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
testTempCheckpointWithFailedQuery(true)
}
}

private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = {
import testImplicits._
val input = MemoryStream[Int]
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
Expand All @@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
intercept[StreamingQueryException] {
query.awaitTermination()
}
assert(fs.exists(checkpointDir))
if (!checkpointMustBeDeleted) {
assert(fs.exists(checkpointDir))
} else {
assert(!fs.exists(checkpointDir))
}
}

test("SPARK-20431: Specify a schema by using a DDL-formatted string") {
Expand Down

0 comments on commit 7c304f6

Please sign in to comment.