Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 160147f

Browse files
committedApr 18, 2025·
Add basic integration test
1 parent e576332 commit 160147f

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed
 

‎sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,48 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
223223
}
224224
}
225225

226+
testWithColumnFamilies("SPARK-51823: unload state stores on commit",
227+
TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled =>
228+
withTempDir { dir =>
229+
withSQLConf(
230+
(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName),
231+
(SQLConf.CHECKPOINT_LOCATION.key -> dir.getCanonicalPath),
232+
(SQLConf.SHUFFLE_PARTITIONS.key -> "1"),
233+
(SQLConf.STATE_STORE_UNLOAD_ON_COMMIT.key -> "true")) {
234+
val inputData = MemoryStream[Int]
235+
236+
val query = inputData.toDS().toDF("value")
237+
.select($"value")
238+
.groupBy($"value")
239+
.agg(count("*"))
240+
.writeStream
241+
.format("console")
242+
.outputMode("complete")
243+
.start()
244+
try {
245+
inputData.addData(1, 2)
246+
inputData.addData(2, 3)
247+
query.processAllAvailable()
248+
249+
// StateStore should be unloaded, so its tmp dir shouldn't exist
250+
for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
251+
assert(!file.getName().startsWith("StateStore"))
252+
}
253+
254+
inputData.addData(3, 4)
255+
inputData.addData(4, 5)
256+
query.processAllAvailable()
257+
258+
for (file <- new File(Utils.getLocalDir(sparkConf)).listFiles()) {
259+
assert(!file.getName().startsWith("StateStore"))
260+
}
261+
} finally {
262+
query.stop()
263+
}
264+
}
265+
}
266+
}
267+
226268
testWithChangelogCheckpointingEnabled(
227269
"Streaming aggregation RocksDB State Store backward compatibility.") {
228270
val checkpointDir = Utils.createTempDir().getCanonicalFile

0 commit comments

Comments
 (0)
Please sign in to comment.