Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,15 @@ object SQLConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(300L)

val STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT =
buildConf("spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout")
.internal()
.doc("Timeout in seconds to wait for tasks to respond to cancellation after " +
"force shutdown is initiated. This applies after the graceful shutdown timeout " +
"has been exceeded.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(60L)

val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT =
buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout")
.internal()
Expand Down Expand Up @@ -6921,6 +6930,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def stateStoreMaintenanceShutdownTimeout: Long = getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT)

def stateStoreMaintenanceForceShutdownTimeout: Long =
getConf(STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT)

def stateStoreMaintenanceProcessingTimeout: Long =
getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,8 @@ object StateStore extends Logging {
*/
class MaintenanceThreadPool(
numThreads: Int,
shutdownTimeout: Long) {
shutdownTimeout: Long,
forceShutdownTimeout: Long) {
private val threadPool = ThreadUtils.newDaemonFixedThreadPool(
numThreads, "state-store-maintenance-thread")

Expand All @@ -1151,7 +1152,7 @@ object StateStore extends Logging {
threadPool.shutdownNow() // Cancel currently executing tasks

// Wait a while for tasks to respond to being cancelled
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
if (!threadPool.awaitTermination(forceShutdownTimeout, TimeUnit.SECONDS)) {
logError("MaintenanceThreadPool did not terminate")
}
}
Expand Down Expand Up @@ -1416,14 +1417,15 @@ object StateStore extends Logging {
private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = {
val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads
val maintenanceShutdownTimeout = storeConf.stateStoreMaintenanceShutdownTimeout
val maintenanceForceShutdownTimeout = storeConf.stateStoreMaintenanceForceShutdownTimeout
loadedProviders.synchronized {
if (SparkEnv.get != null && !isMaintenanceRunning && !storeConf.unloadOnCommit) {
maintenanceTask = new MaintenanceTask(
storeConf.maintenanceInterval,
task = { doMaintenance(storeConf) }
)
maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads,
maintenanceShutdownTimeout)
maintenanceShutdownTimeout, maintenanceForceShutdownTimeout)
logInfo("State Store maintenance task started")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class StateStoreConf(
*/
val stateStoreMaintenanceShutdownTimeout: Long = sqlConf.stateStoreMaintenanceShutdownTimeout

/**
* Timeout to wait for tasks to respond to cancellation after force shutdown is initiated
*/
val stateStoreMaintenanceForceShutdownTimeout: Long =
sqlConf.stateStoreMaintenanceForceShutdownTimeout

val stateStoreMaintenanceProcessingTimeout: Long = sqlConf.stateStoreMaintenanceProcessingTimeout

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2",
SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0"
Expand Down Expand Up @@ -433,6 +434,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
Expand Down Expand Up @@ -468,6 +470,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
Expand Down Expand Up @@ -511,6 +514,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
Expand Down Expand Up @@ -578,6 +582,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "false",
Expand Down Expand Up @@ -621,6 +626,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
Expand Down Expand Up @@ -666,6 +672,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
Expand Down Expand Up @@ -787,6 +794,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "3",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName,
Expand Down Expand Up @@ -866,6 +874,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest {
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName,
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
Expand Down Expand Up @@ -941,6 +950,7 @@ object StateStoreCoordinatorSuite {
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true",
SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
) {
Expand Down Expand Up @@ -139,6 +140,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3"
) {
Expand Down Expand Up @@ -214,6 +216,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10",
SQLConf.SHUFFLE_PARTITIONS.key -> "3"
Expand Down Expand Up @@ -291,6 +294,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10"
) {
Expand Down Expand Up @@ -365,6 +369,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF
SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100",
SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10",
SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3",
SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5",
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4",
SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"
Expand Down