Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming to higher versions.
Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html).

## Upgrading from Structured Streaming to 3.2.2

- Since Spark 3.2.2 (and 3.3), all stateful operators require hash partitioning with exact grouping keys. In previous versions, all stateful operators except stream-stream join require loose partitioning criteria which opens the possibility on correctness issue. (See [SPARK-38204](https://issues.apache.org/jira/browse/SPARK-38204) for more details.) To ensure backward compatibility, we retain the old behavior with the checkpoint built from older versions.

## Upgrading from Structured Streaming 3.0 to 3.1

- In Spark 3.0 and before, for the queries that have stateful operation which can emit rows older than the current watermark plus allowed late record delay, which are "late rows" in downstream stateful operations and these rows can be discarded, Spark only prints a warning message. Since Spark 3.1, Spark will check for such queries with possible correctness issue and throw AnalysisException for it by default. For the users who understand the possible risk of correctness issue and still decide to run the query, please disable this check by setting the config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,19 @@ case class ClusteredDistribution(

/**
* Represents data where tuples have been clustered according to the hash of the given
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only
* [[HashPartitioning]] can satisfy this distribution.
* `expressions`. Since this distribution relies on [[HashPartitioning]] on the physical
* partitioning, only [[HashPartitioning]] (and HashPartitioning in [[PartitioningCollection]])
* can satisfy this distribution. When `requiredNumPartitions` is Some(1), [[SinglePartition]]
* is essentially same as [[HashPartitioning]], so it can satisfy this distribution as well.
*
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
* number of partitions, this distribution strictly requires which partition the tuple should be in.
* This distribution is used majorly to represent the requirement of distribution on the stateful
* operator in Structured Streaming, but this can be used for other cases as well.
*
* NOTE: Each partition in stateful operator initializes state store(s), which are independent
* with state store(s) in other partitions. Since it is not possible to repartition the data in
* state store, Spark should make sure the physical partitioning of the stateful operator is
* unchanged across Spark versions. Violation of this requirement may bring silent correctness
* issue.
*/
case class HashClusteredDistribution(
expressions: Seq[Expression],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1656,6 +1656,23 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATEFUL_OPERATOR_USE_STRICT_DISTRIBUTION =
buildConf("spark.sql.streaming.statefulOperator.useStrictDistribution")
.internal()
.doc("The purpose of this config is only compatibility; DO NOT MANUALLY CHANGE THIS!!! " +
"When true, the stateful operator for streaming query will use " +
"HashClusteredDistribution which guarantees stable state partitioning as long as " +
"the operator provides consistent grouping keys across the lifetime of query. " +
"When false, the stateful operator for streaming query will use ClusteredDistribution " +
"which is not sufficient to guarantee stable state partitioning despite the operator " +
"provides consistent grouping keys across the lifetime of query. " +
"This config will be set to true for new streaming queries to guarantee stable state " +
"partitioning, and set to false for existing streaming queries to not break queries " +
"which are restored from existing checkpoints. Please refer SPARK-38204 for details.")
.version("3.2.2")
.booleanConf
.createWithDefault(true)

val FILESTREAM_SINK_METADATA_IGNORED =
buildConf("spark.sql.streaming.fileStreamSink.ignoreMetadata")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,28 @@ object AggUtils {
}
}

private def createStreamingAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
initialInputBufferOffset: Int = 0,
resultExpressions: Seq[NamedExpression] = Nil,
child: SparkPlan): SparkPlan = {
createAggregate(
requiredChildDistributionExpressions,
isStreaming = true,
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
initialInputBufferOffset = initialInputBufferOffset,
resultExpressions = resultExpressions,
child = child)
}

private def createAggregate(
requiredChildDistributionExpressions: Option[Seq[Expression]] = None,
isStreaming: Boolean = false,
groupingExpressions: Seq[NamedExpression] = Nil,
aggregateExpressions: Seq[AggregateExpression] = Nil,
aggregateAttributes: Seq[Attribute] = Nil,
Expand All @@ -56,6 +76,8 @@ object AggUtils {
if (useHash) {
HashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand All @@ -69,6 +91,8 @@ object AggUtils {
if (objectHashEnabled && useObjectHash) {
ObjectHashAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand All @@ -78,6 +102,8 @@ object AggUtils {
} else {
SortAggregateExec(
requiredChildDistributionExpressions = requiredChildDistributionExpressions,
isStreaming = isStreaming,
numShufflePartitions = None,
groupingExpressions = groupingExpressions,
aggregateExpressions = mayRemoveAggFilters(aggregateExpressions),
aggregateAttributes = aggregateAttributes,
Expand Down Expand Up @@ -286,7 +312,7 @@ object AggUtils {
val partialAggregate: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Partial))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
Expand All @@ -298,7 +324,7 @@ object AggUtils {
val partialMerged1: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions =
Some(groupingAttributes),
groupingExpressions = groupingAttributes,
Expand All @@ -316,7 +342,7 @@ object AggUtils {
val partialMerged2: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions =
Some(groupingAttributes),
groupingExpressions = groupingAttributes,
Expand Down Expand Up @@ -344,7 +370,7 @@ object AggUtils {
// projection:
val finalAggregateAttributes = finalAggregateExpressions.map(_.resultAttribute)

createAggregate(
createStreamingAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes),
groupingExpressions = groupingAttributes,
aggregateExpressions = finalAggregateExpressions,
Expand Down Expand Up @@ -403,7 +429,7 @@ object AggUtils {
val partialAggregate: SparkPlan = {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = Partial))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
createAggregate(
createStreamingAggregate(
groupingExpressions = groupingExpressions,
aggregateExpressions = aggregateExpressions,
aggregateAttributes = aggregateAttributes,
Expand All @@ -420,7 +446,8 @@ object AggUtils {
// this is to reduce amount of rows to shuffle
MergingSessionsExec(
requiredChildDistributionExpressions = None,
requiredChildDistributionOption = None,
isStreaming = true,
numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggregateExpressions,
Expand All @@ -443,8 +470,10 @@ object AggUtils {
val aggregateExpressions = functionsWithoutDistinct.map(_.copy(mode = PartialMerge))
val aggregateAttributes = aggregateExpressions.map(_.resultAttribute)
MergingSessionsExec(
requiredChildDistributionExpressions = None,
requiredChildDistributionOption = Some(restored.requiredChildDistribution),
requiredChildDistributionExpressions = Some(groupingWithoutSessionAttributes),
isStreaming = true,
// This will be replaced with actual value in state rule.
numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggregateExpressions,
Expand Down Expand Up @@ -472,8 +501,8 @@ object AggUtils {
// projection:
val finalAggregateAttributes = finalAggregateExpressions.map(_.resultAttribute)

createAggregate(
requiredChildDistributionExpressions = Some(groupingAttributes),
createStreamingAggregate(
requiredChildDistributionExpressions = Some(groupingWithoutSessionAttributes),
groupingExpressions = groupingAttributes,
aggregateExpressions = finalAggregateExpressions,
aggregateAttributes = finalAggregateAttributes,
Expand All @@ -487,10 +516,15 @@ object AggUtils {

private def mayAppendUpdatingSessionExec(
groupingExpressions: Seq[NamedExpression],
maybeChildPlan: SparkPlan): SparkPlan = {
maybeChildPlan: SparkPlan,
isStreaming: Boolean = false): SparkPlan = {
groupingExpressions.find(_.metadata.contains(SessionWindow.marker)) match {
case Some(sessionExpression) =>
UpdatingSessionsExec(
isStreaming = isStreaming,
// numShufflePartitions will be set to None, and replaced to the actual value in the
// state rule if the query is streaming.
numShufflePartitions = None,
groupingExpressions.map(_.toAttribute),
sessionExpression.toAttribute,
maybeChildPlan)
Expand All @@ -502,7 +536,8 @@ object AggUtils {
private def mayAppendMergingSessionExec(
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
partialAggregate: SparkPlan): SparkPlan = {
partialAggregate: SparkPlan,
isStreaming: Boolean = false): SparkPlan = {
groupingExpressions.find(_.metadata.contains(SessionWindow.marker)) match {
case Some(sessionExpression) =>
val aggExpressions = aggregateExpressions.map(_.copy(mode = PartialMerge))
Expand All @@ -515,7 +550,10 @@ object AggUtils {

MergingSessionsExec(
requiredChildDistributionExpressions = Some(groupingWithoutSessionsAttributes),
requiredChildDistributionOption = None,
isStreaming = isStreaming,
// numShufflePartitions will be set to None, and replaced to the actual value in the
// state rule if the query is streaming.
numShufflePartitions = None,
groupingExpressions = groupingAttributes,
sessionExpression = sessionExpression,
aggregateExpressions = aggExpressions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Final, PartialMerge}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, ExplainUtils, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* Holds common logic for aggregate operators
*/
trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning {
def requiredChildDistributionExpressions: Option[Seq[Expression]]
def isStreaming: Boolean
def numShufflePartitions: Option[Int]
def groupingExpressions: Seq[NamedExpression]
def aggregateExpressions: Seq[AggregateExpression]
def aggregateAttributes: Seq[Attribute]
Expand Down Expand Up @@ -91,7 +94,20 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning
override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) => ClusteredDistribution(exprs) :: Nil
case Some(exprs) =>
if (isStreaming) {
numShufflePartitions match {
case Some(parts) =>
StatefulOperatorPartitioning.getCompatibleDistribution(
exprs, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
"constructing required child distribution!")
}
} else {
ClusteredDistribution(exprs) :: Nil
}
case None => UnspecifiedDistribution :: Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import org.apache.spark.util.Utils
*/
case class HashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, MutableProjection, NamedExpression, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

Expand All @@ -41,7 +40,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class MergingSessionsExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
requiredChildDistributionOption: Option[Seq[Distribution]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
sessionExpression: NamedExpression,
aggregateExpressions: Seq[AggregateExpression],
Expand All @@ -59,17 +59,6 @@ case class MergingSessionsExec(

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def requiredChildDistribution: List[Distribution] = {
requiredChildDistributionExpressions match {
case Some(exprs) if exprs.isEmpty => AllTuples :: Nil
case Some(exprs) => ClusteredDistribution(exprs) :: Nil
case None => requiredChildDistributionOption match {
case Some(distributions) => distributions.toList
case None => UnspecifiedDistribution :: Nil
}
}
}

override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
Seq((keyWithoutSessionExpressions ++ Seq(sessionExpression)).map(SortOrder(_, Ascending)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class ObjectHashAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class SortAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.streaming.StatefulOperatorPartitioning

/**
* This node updates the session window spec of each input rows via analyzing neighbor rows and
Expand All @@ -35,6 +36,8 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
* Refer [[UpdatingSessionsIterator]] for more details.
*/
case class UpdatingSessionsExec(
isStreaming: Boolean,
numShufflePartitions: Option[Int],
groupingExpression: Seq[Attribute],
sessionExpression: Attribute,
child: SparkPlan) extends UnaryExecNode {
Expand Down Expand Up @@ -63,7 +66,20 @@ case class UpdatingSessionsExec(
if (groupingWithoutSessionExpression.isEmpty) {
AllTuples :: Nil
} else {
ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
if (isStreaming) {
numShufflePartitions match {
case Some(parts) =>
StatefulOperatorPartitioning.getCompatibleDistribution(
groupingWithoutSessionExpression, parts, conf) :: Nil

case _ =>
throw new IllegalStateException("Expected to set the number of partitions before " +
"constructing required child distribution!")
}

} else {
ClusteredDistribution(groupingWithoutSessionExpression) :: Nil
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution}
import org.apache.spark.sql.catalyst.plans.physical.Distribution
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
import org.apache.spark.sql.execution.streaming.state._
Expand Down Expand Up @@ -93,8 +93,10 @@ case class FlatMapGroupsWithStateExec(
* to have the same grouping so that the data are co-lacated on the same task.
*/
override def requiredChildDistribution: Seq[Distribution] = {
ClusteredDistribution(groupingAttributes, stateInfo.map(_.numPartitions)) ::
ClusteredDistribution(initialStateGroupAttrs, stateInfo.map(_.numPartitions)) ::
StatefulOperatorPartitioning.getCompatibleDistribution(
groupingAttributes, getStateInfo, conf) ::
StatefulOperatorPartitioning.getCompatibleDistribution(
initialStateGroupAttrs, getStateInfo, conf) ::
Nil
}

Expand Down
Loading