Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26389][SS] Add force delete temp checkpoint configuration #23732

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,12 @@ object SQLConf {
.stringConf
.createOptional

val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION =
buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us follow the other boolean conf naming convention?

spark.sql.streaming.forceDeleteTempCheckpointLocation.enabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @Ngone51 could you submit a PR to make a change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitted #26981

.doc("When true, enable temporary checkpoint locations force delete.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd like to have this to be a valid and more described sentence like other configurations, especially this feature is only documented here. One example for me, When true, Spark always deletes temporary checkpoint locations for success queries.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, @HeartSaVioR . That is the behavior before this PR.
The benefit of this configuration is to delete the locations when exception occurs.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Feb 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes my bad I meant failed queries, maybe better, regardless of query's result - success or failure. Anyway this option is only documented here so I'd just wanted to make sure there's clear description.

.booleanConf
.createWithDefault(false)

val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
.internal()
.doc("The minimum number of batches that must be retained and made recoverable.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ case object RECONFIGURING extends State
* and the results are committed transactionally to the given [[Sink]].
*
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
* errors
* errors. Checkpoint deletion can be forced with the appropriate
* Spark configuration.
*/
abstract class StreamExecution(
override val sparkSession: SparkSession,
Expand Down Expand Up @@ -92,6 +93,7 @@ abstract class StreamExecution(
fs.mkdirs(checkpointPath)
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
}
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")

def logicalPlan: LogicalPlan

Expand Down Expand Up @@ -335,10 +337,13 @@ abstract class StreamExecution(
postEvent(
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))

// Delete the temp checkpoint only when the query didn't fail
if (deleteCheckpointOnStop && exception.isEmpty) {
// Delete the temp checkpoint when either force delete enabled or the query didn't fail
if (deleteCheckpointOnStop &&
(sparkSession.sessionState.conf
.getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) {
val checkpointPath = new Path(resolvedCheckpointRoot)
try {
logInfo(s"Deleting checkpoint $checkpointPath.")
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
fs.delete(checkpointPath, true)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}
}

test("configured checkpoint dir should not be deleted if a query is stopped without errors and" +
" force temp checkpoint deletion enabled") {
import testImplicits._
withTempDir { checkpointPath =>
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath,
SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
val ds = MemoryStream[Int].toDS
val query = ds.writeStream.format("console").start()
assert(checkpointPath.exists())
query.stop()
assert(checkpointPath.exists())
}
}
}

test("temp checkpoint dir should be deleted if a query is stopped without errors") {
import testImplicits._
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
Expand All @@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
}

testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
testTempCheckpointWithFailedQuery(false)
}

testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" +
" temp checkpoint deletion enabled") {
withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
testTempCheckpointWithFailedQuery(true)
}
}

private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = {
import testImplicits._
val input = MemoryStream[Int]
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
Expand All @@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
intercept[StreamingQueryException] {
query.awaitTermination()
}
assert(fs.exists(checkpointDir))
if (!checkpointMustBeDeleted) {
assert(fs.exists(checkpointDir))
} else {
assert(!fs.exists(checkpointDir))
}
}

test("SPARK-20431: Specify a schema by using a DDL-formatted string") {
Expand Down