Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
siying committed Sep 16, 2024
1 parent 5b3e326 commit 3f5509e
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
* plan incrementally. Possibly preserving state in between each execution.
* @param currentCheckpointUniqueId checkpoint ID for the latest committed version. It is
* operatorID -> array of checkpointIDs. Array index n
* represents checkpoint ID for the nth shuffle partition.
*/
class IncrementalExecution(
sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,6 @@ class MicroBatchExecution(
private def updateCheckpointId(
execCtx: MicroBatchExecutionContext,
latestExecPlan: SparkPlan): Unit = {
// This function cannot handle MBP now.
latestExecPlan.collect {
case e: StateStoreSaveExec =>
assert(e.stateInfo.isDefined)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class RocksDB(
try {
if (loadedVersion != version ||
(checkpointFormatVersion >= 2 && checkpointUniqueId.isDefined &&
(!loadedCheckpointId.isDefined || checkpointUniqueId.get != loadedCheckpointId.get))) {
(loadedCheckpointId.isEmpty || checkpointUniqueId.get != loadedCheckpointId.get))) {
closeDB(ignoreException = false)
// deep copy is needed to avoid race condition
// between maintenance and task threads
Expand Down Expand Up @@ -791,6 +791,10 @@ class RocksDB(
acquire(RollbackStore)
numKeysOnWritingVersion = numKeysOnLoadedVersion
loadedVersion = -1L
LastCommitBasedCheckpointId = None
lastCommittedCheckpointId = None
loadedCheckpointId = None
sessionCheckpointId = None
changelogWriter.foreach(_.abort())
// Make sure changelogWriter gets recreated next time.
changelogWriter = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,10 @@ private[sql] class RocksDBStateStoreProvider
if (version < 0) {
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
}
rocksDB.load(version, uniqueId, true)
rocksDB.load(
version,
if (storeConf.stateStoreCheckpointFormatVersion >= 2) uniqueId else None,
true)
new RocksDBStateStore(version)
}
catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
.agg(count("*"))
.as[(Int, Long)]

// Run the stream with changelog checkpointing disabled.
testStream(aggregated, Update)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
AddData(inputData, 3),
Expand Down

0 comments on commit 3f5509e

Please sign in to comment.