diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md index 002058b69bf30..d52b2e095fc76 100644 --- a/docs/ss-migration-guide.md +++ b/docs/ss-migration-guide.md @@ -26,10 +26,14 @@ Note that this migration guide describes the items specific to Structured Stream Many items of SQL migration can be applied when migrating Structured Streaming to higher versions. Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html). +## Upgrading from Structured Streaming 3.0 to 3.1 + +- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false. + ## Upgrading from Structured Streaming 2.4 to 3.0 - In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`. - Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (See [SPARK-26154](https://issues.apache.org/jira/browse/SPARK-26154) for more details). If you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join, Spark 3.0 fails the query. To recalculate outputs, discard the checkpoint and replay previous inputs. -- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`. \ No newline at end of file +- In Spark 3.0, the deprecated class `org.apache.spark.sql.streaming.ProcessingTime` has been removed. Use `org.apache.spark.sql.streaming.Trigger.ProcessingTime` instead. Likewise, `org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger` has been removed in favor of `Trigger.Continuous`, and `org.apache.spark.sql.execution.streaming.OneTimeTrigger` has been hidden in favor of `Trigger.Once`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 809323455652e..814ea8c9768ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode /** @@ -40,10 +41,15 @@ object UnsupportedOperationChecker extends Logging { } } + /** + * Checks for possible correctness issue in chained stateful operators. The behavior is + * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. + * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just + * print a warning message. + */ def checkStreamingQueryGlobalWatermarkLimit( plan: LogicalPlan, - outputMode: OutputMode, - failWhenDetected: Boolean): Unit = { + outputMode: OutputMode): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { case s: Aggregate if s.isStreaming && outputMode == InternalOutputModes.Append => true @@ -62,6 +68,8 @@ object UnsupportedOperationChecker extends Logging { case _ => false } + val failWhenDetected = SQLConf.get.statefulOperatorCorrectnessCheckEnabled + try { plan.foreach { subPlan => if (isStatefulOperation(subPlan)) { @@ -73,7 +81,10 @@ object UnsupportedOperationChecker extends Logging { "The query contains stateful operation which can emit rows older than " + "the current watermark plus allowed late record delay, which are \"late rows\"" + " in downstream stateful operations and these rows can be discarded. " + - "Please refer the programming guide doc for more details." + "Please refer the programming guide doc for more details. If you understand " + + "the possible risk of correctness issue and still need to run the query, " + + "you can disable this check by setting the config " + + "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false." throwError(errorMsg)(plan) } } @@ -388,7 +399,7 @@ object UnsupportedOperationChecker extends Logging { checkUnsupportedExpressions(subPlan) } - checkStreamingQueryGlobalWatermarkLimit(plan, outputMode, failWhenDetected = false) + checkStreamingQueryGlobalWatermarkLimit(plan, outputMode) } def checkForContinuous(plan: LogicalPlan, outputMode: OutputMode): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8825f4f96378d..42307af2e9955 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1382,6 +1382,21 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED = + buildConf("spark.sql.streaming.statefulOperator.checkCorrectness.enabled") + .internal() + .doc("When true, the stateful operators for streaming query will be checked for possible " + + "correctness issue due to global watermark. The correctness issue comes from queries " + + "containing stateful operation which can emit rows older than the current watermark " + + "plus allowed late record delay, which are \"late rows\" in downstream stateful " + + "operations and these rows can be discarded. Please refer the programming guide doc for " + + "more details. Once the issue is detected, Spark will throw analysis exception. " + + "When this config is disabled, Spark will just print warning message for users. " + + "Prior to Spark 3.1.0, the behavior is disabling this config.") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val VARIABLE_SUBSTITUTE_ENABLED = buildConf("spark.sql.variable.substitute") .doc("This enables substitution using syntax like `${var}`, `${system:var}`, " + @@ -3028,6 +3043,9 @@ class SQLConf extends Serializable with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + def statefulOperatorCorrectnessCheckEnabled: Boolean = + getConf(STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED) + def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index b9943a9744985..21dde3ca8ca51 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{IntegerType, LongType, MetadataBuilder} import org.apache.spark.unsafe.types.CalendarInterval @@ -36,7 +37,7 @@ import org.apache.spark.unsafe.types.CalendarInterval /** A dummy command for testing unsupported operations. */ case class DummyCommand() extends Command -class UnsupportedOperationsSuite extends SparkFunSuite { +class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { val attribute = AttributeReference("a", IntegerType, nullable = true)() val watermarkMetadata = new MetadataBuilder() @@ -218,6 +219,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { expectedMsgs = Seq("flatMapGroupsWithState in append mode", "update")) // FlatMapGroupsWithState(Append) in streaming with aggregation + // Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled. for (outputMode <- Seq(Append, Update, Complete)) { assertSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Append) " + @@ -228,7 +230,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { FlatMapGroupsWithState( null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, streamRelation)), - outputMode = outputMode) + outputMode = outputMode, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") } for (outputMode <- Seq(Append, Update)) { @@ -268,6 +271,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite { } // multiple FlatMapGroupsWithStates + // Only supported when `spark.sql.streaming.statefulOperator.correctnessCheck` is disabled. assertSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on streaming relation and all are " + "in append mode", @@ -275,7 +279,8 @@ class UnsupportedOperationsSuite extends SparkFunSuite { isMapGroupsWithState = false, null, FlatMapGroupsWithState(null, att, att, Seq(att), Seq(att), att, null, Append, isMapGroupsWithState = false, null, streamRelation)), - outputMode = Append) + outputMode = Append, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - multiple flatMapGroupsWithStates on s streaming relation but some" + @@ -995,9 +1000,12 @@ class UnsupportedOperationsSuite extends SparkFunSuite { def assertSupportedInStreamingPlan( name: String, plan: LogicalPlan, - outputMode: OutputMode): Unit = { + outputMode: OutputMode, + configs: (String, String)*): Unit = { test(s"streaming plan - $name: supported") { - UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) + withSQLConf(configs: _*) { + UnsupportedOperationChecker.checkForStreaming(wrapInStreaming(plan), outputMode) + } } } @@ -1070,14 +1078,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite { expectFailure: Boolean): Unit = { test(s"Global watermark limit - $testNamePostfix") { if (expectFailure) { - val e = intercept[AnalysisException] { - UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( - wrapInStreaming(plan), outputMode, failWhenDetected = true) + withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "true") { + val e = intercept[AnalysisException] { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode) + } + assert(e.message.contains("Detected pattern of possible 'correctness' issue")) } - assert(e.message.contains("Detected pattern of possible 'correctness' issue")) } else { - UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( - wrapInStreaming(plan), outputMode, failWhenDetected = true) + withSQLConf(SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") { + UnsupportedOperationChecker.checkStreamingQueryGlobalWatermarkLimit( + wrapInStreaming(plan), outputMode) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 2efd715b7731c..f97c9386f9488 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -1324,7 +1324,9 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest { def testWithAllStateVersions(name: String)(func: => Unit): Unit = { for (version <- FlatMapGroupsWithStateExecHelper.supportedVersions) { test(s"$name - state format version $version") { - withSQLConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString) { + withSQLConf( + SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key -> version.toString, + SQLConf.STATEFUL_OPERATOR_CHECK_CORRECTNESS_ENABLED.key -> "false") { func } }