diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c331f1724854..dfcdbd6eca29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2553,6 +2553,15 @@ object SQLConf { .timeConf(TimeUnit.SECONDS) .createWithDefault(300L) + val STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT = + buildConf("spark.sql.streaming.stateStore.maintenanceForceShutdownTimeout") + .internal() + .doc("Timeout in seconds to wait for tasks to respond to cancellation after " + + "force shutdown is initiated. This applies after the graceful shutdown timeout " + + "has been exceeded.") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(60L) + val STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT = buildConf("spark.sql.streaming.stateStore.maintenanceProcessingTimeout") .internal() @@ -6921,6 +6930,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def stateStoreMaintenanceShutdownTimeout: Long = getConf(STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT) + def stateStoreMaintenanceForceShutdownTimeout: Long = + getConf(STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT) + def stateStoreMaintenanceProcessingTimeout: Long = getConf(STATE_STORE_MAINTENANCE_PROCESSING_TIMEOUT) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index bd6b4bede84b..53053fc9e091 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -1130,7 +1130,8 @@ object StateStore extends Logging { */ class MaintenanceThreadPool( numThreads: Int, - shutdownTimeout: Long) { + shutdownTimeout: Long, + forceShutdownTimeout: Long) { private val threadPool = ThreadUtils.newDaemonFixedThreadPool( numThreads, "state-store-maintenance-thread") @@ -1151,7 +1152,7 @@ object StateStore extends Logging { threadPool.shutdownNow() // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled - if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { + if (!threadPool.awaitTermination(forceShutdownTimeout, TimeUnit.SECONDS)) { logError("MaintenanceThreadPool did not terminate") } } @@ -1416,6 +1417,7 @@ object StateStore extends Logging { private def startMaintenanceIfNeeded(storeConf: StateStoreConf): Unit = { val numMaintenanceThreads = storeConf.numStateStoreMaintenanceThreads val maintenanceShutdownTimeout = storeConf.stateStoreMaintenanceShutdownTimeout + val maintenanceForceShutdownTimeout = storeConf.stateStoreMaintenanceForceShutdownTimeout loadedProviders.synchronized { if (SparkEnv.get != null && !isMaintenanceRunning && !storeConf.unloadOnCommit) { maintenanceTask = new MaintenanceTask( @@ -1423,7 +1425,7 @@ object StateStore extends Logging { task = { doMaintenance(storeConf) } ) maintenanceThreadPool = new MaintenanceThreadPool(numMaintenanceThreads, - maintenanceShutdownTimeout) + maintenanceShutdownTimeout, maintenanceForceShutdownTimeout) logInfo("State Store maintenance task started") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala index 3991f8d93f2c..f3bbc0ea2406 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala @@ -40,6 +40,12 @@ class StateStoreConf( */ val stateStoreMaintenanceShutdownTimeout: Long = sqlConf.stateStoreMaintenanceShutdownTimeout + /** + * Timeout to wait for tasks to respond to cancellation after force shutdown is initiated + */ + val stateStoreMaintenanceForceShutdownTimeout: Long = + sqlConf.stateStoreMaintenanceForceShutdownTimeout + val stateStoreMaintenanceProcessingTimeout: Long = sqlConf.stateStoreMaintenanceProcessingTimeout /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala index 6948aedd5640..7446390e8d06 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala @@ -127,6 +127,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.STATE_STORE_FORCE_SNAPSHOT_UPLOAD_ON_LAG.key -> "false", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_COORDINATOR_MULTIPLIER_FOR_MIN_VERSION_DIFF_TO_LOG.key -> "2", SQLConf.STATE_STORE_COORDINATOR_SNAPSHOT_LAG_REPORT_INTERVAL.key -> "0" @@ -433,6 +434,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", @@ -468,6 +470,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.SHUFFLE_PARTITIONS.key -> "3", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", @@ -511,6 +514,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, @@ -578,6 +582,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName, RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "false", @@ -621,6 +626,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext { SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, @@ -666,6 +672,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest { SQLConf.SHUFFLE_PARTITIONS.key -> "3", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "2", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", @@ -787,6 +794,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest { SQLConf.SHUFFLE_PARTITIONS.key -> "3", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBSkipMaintenanceOnCertainPartitionsProvider].getName, @@ -866,6 +874,7 @@ class StateStoreCoordinatorStreamingSuite extends StreamTest { SQLConf.SHUFFLE_PARTITIONS.key -> "2", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_PROVIDER_CLASS.key -> providerClassName, RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", @@ -941,6 +950,7 @@ object StateStoreCoordinatorSuite { SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".changelogCheckpointing.enabled" -> "true", SQLConf.STATE_STORE_COORDINATOR_REPORT_SNAPSHOT_UPLOAD_LAG.key -> "true", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala index 0b51a07837c2..58d951500c8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreInstanceMetricSuite.scala @@ -75,6 +75,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" ) { @@ -139,6 +140,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "3" ) { @@ -214,6 +216,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10", SQLConf.SHUFFLE_PARTITIONS.key -> "3" @@ -291,6 +294,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "10" ) { @@ -365,6 +369,7 @@ class StateStoreInstanceMetricSuite extends StreamTest with AlsoTestWithRocksDBF SQLConf.STREAMING_MAINTENANCE_INTERVAL.key -> "100", SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10", SQLConf.STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT.key -> "3", + SQLConf.STATE_STORE_MAINTENANCE_FORCE_SHUTDOWN_TIMEOUT.key -> "5", SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1", SQLConf.STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT.key -> "4", SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3"