-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50017] Support Avro encoding for TransformWithState operator #48401
base: master
Are you sure you want to change the base?
Conversation
ec1e07a
to
1aca8f4
Compare
connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to move this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it's used in AvroOptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we considered introducing a deprecated class under org.apache.spark.sql.avro that retains all the existing public methods, while moving their implementations into sql/core?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can do this.
connector/avro/src/main/scala/org/apache/spark/sql/avro/DeprecatedSchemaConverters.scala
Outdated
Show resolved
Hide resolved
connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImplWithTTL.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
|
||
@deprecated("Use org.apache.spark.sql.core.avro.SchemaConverters instead", "4.0.0") | ||
@Evolving | ||
object DeprecatedSchemaConverters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the name SchemaConverters
and don't have Deprecated
in the object name
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
Outdated
Show resolved
Hide resolved
@@ -104,7 +106,10 @@ case class TransformWithStateExec( | |||
* @return a new instance of the driver processor handle | |||
*/ | |||
private def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = { | |||
val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode, keyEncoder) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra newline ?
.../main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala
Show resolved
Hide resolved
def encodePrefixKeyForRangeScan( | ||
row: UnsafeRow, | ||
avroType: Schema | ||
): Array[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets confirm the style here
@@ -91,7 +91,8 @@ case class CkptIdCollectingStateStoreWrapper(innerStore: StateStore) extends Sta | |||
valueSchema: StructType, | |||
keyStateEncoderSpec: KeyStateEncoderSpec, | |||
useMultipleValuesPerKey: Boolean = false, | |||
isInternal: Boolean = false): Unit = { | |||
isInternal: Boolean = false, | |||
avroEnc: Option[AvroEncoder]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets use default args here as well ?
StructField("key2", StringType, false), | ||
StructField("ordering-2", IntegerType, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a test to verify the behavior if -
is used within the state var names since its not supported in Avro ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm with pending nits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working through the PR, but some first comments to work on while I continue review
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader | |||
import org.apache.avro.io.{BinaryDecoder, DecoderFactory} | |||
|
|||
import org.apache.spark.SparkException | |||
import org.apache.spark.sql.avro.SchemaConverters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a stray change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because we changed the directory of the file, we had to add imports.
.checkValue(v => Set("UnsafeRow", "Avro").contains(v), | ||
"Valid values are 'UnsafeRow' and 'Avro'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we want to be case insensitive here?
case statefulOp: StatefulOperator => | ||
statefulOp match { | ||
case op: TransformWithStateExec => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I assume you're doing this two step matching, because the avro serde will be added to other operators too in follow ups?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it will be.
* Fetching the columnFamilySchemas from the StatefulProcessorHandle | ||
* after init is called. | ||
*/ | ||
private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { | ||
def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { | ||
val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas | ||
closeProcessorHandle() | ||
columnFamilySchemas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be moved to a static method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - I guess if we can pass the stateful processor in, it can be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I don't think you can make it static - we need the the statefulProcessor
that is passed into this particular instance of the TransformWithStateExec
class.
op.copy( | ||
columnFamilySchemas = op.getColFamilySchemas() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit confusing imho. If you have the getColFamilySchemas
method as part of the class available, why do you have to set it on the class with a copy.
Two possible suggestions:
- Make the
getColFamilySchemas
a static method. Not sure if that's possible though looking at the logic a bit more in TransformWithStateExec. It feels weird that you're opening and closing these handles just to get some of the information out. - Add a comment here that this needs to be run on the Driver, and also instead rename the method to:
withColumnFamilySchemas
which calls copy internally.
ttlValSchema, | ||
Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), | ||
avroEnc = getAvroSerde( | ||
StructType(ttlKeySchema.drop(2)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this drop(2) looks magical. Can you add a comment mentioning that these represent the null/positive/negative byte and the big endian representation?
// This function creates the StateStoreColFamilySchema for | ||
// the TTL secondary index. | ||
// Because we want to encode fixed-length types as binary types | ||
// if we are using Avro, we need to do some schema conversion to ensure | ||
// we can use range scan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on docs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please specify when this method should be used and not the one above
ttlValSchema, | ||
Some(RangeKeyScanStateEncoderSpec(ttlKeySchema, Seq(0))), | ||
avroEnc = getAvroSerde( | ||
StructType(ttlKeySchema.drop(2)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
)) | ||
} | ||
|
||
// This function creates the StateStoreColFamilySchema for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
valSchema, | ||
Some(RangeKeyScanStateEncoderSpec(keySchema, Seq(0))), | ||
avroEnc = getAvroSerde( | ||
StructType(avroKeySchema.drop(2)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have an abstraction leak here. Ideally the Avro encoders should be created in the StateStore, not be passed around in the XStateImpl classes. The serialization format should be the duty of the StateStore. Having the state impl classes knowledge of the serialization format seems like an abstraction leak
if (!schemas.contains(stateName)) { | ||
None | ||
} else { | ||
schemas(stateName).avroEnc | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schemas.get(stateName).map(_.avroEnc)
?
@@ -343,7 +497,8 @@ class StatefulProcessorHandleImpl( | |||
* actually done. We need this class because we can only collect the schemas after | |||
* the StatefulProcessor is initialized. | |||
*/ | |||
class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) | |||
class DriverStatefulProcessorHandleImpl( | |||
timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any], initializeAvroEnc: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one line per parameter please
(f: StateStore => CompletionIterator[InternalRow, Iterator[InternalRow]]): | ||
(f: StateStore => | ||
CompletionIterator[InternalRow, Iterator[InternalRow]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uber nit: change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also nit: should we use type aliasing to shorten this CompletionIterator[InternalRow, Iterator[InternalRow]]
? Like type ResultIterator = CompletionIterator[InternalRow, Iterator[InternalRow]]
def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = { | ||
val driverProcessorHandle = new DriverStatefulProcessorHandleImpl( | ||
timeMode, keyEncoder, initializeAvroEnc = | ||
avroEncodingEnabled(stateStoreEncoding)) | ||
driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT) | ||
statefulProcessor.setHandle(driverProcessorHandle) | ||
statefulProcessor.init(outputMode, timeMode) | ||
driverProcessorHandle | ||
} | ||
|
||
val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe if you add a withColumnFamilySchema
method, you can remove the need for this duplication and can just call it below after creating the class
@@ -104,7 +106,9 @@ case class TransformWithStateExec( | |||
* @return a new instance of the driver processor handle | |||
*/ | |||
private def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this method have an assertion that it is being called on the Driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes are SOOO much cleaner now, thank you. It can get even cleaner though:
- I feel like you can add a Serde interface for the StateEncoder code changes. That should simplify the code even further
- Any reason we just didn't extend the suites with a different SQLConf to test out the different encoding type? I feel that would remove a ton of code changes as well
@@ -563,13 +684,233 @@ class RangeKeyScanStateEncoder( | |||
writer.getRow() | |||
} | |||
|
|||
def encodePrefixKeyForRangeScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a scaladoc please?
out.toByteArray | ||
} | ||
|
||
def decodePrefixKeyForRangeScan( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on scaladoc please
virtualColFamilyId: Option[Short] = None) | ||
extends RocksDBKeyStateEncoderBase(useColumnFamilies, virtualColFamilyId) { | ||
virtualColFamilyId: Option[Short] = None, | ||
avroEnc: Option[AvroEncoder] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of avroEnc, I would honestly introduce another interface:
trait Serde {
def encodeToBytes(...)
def decodeToUnsafeRow(...)
def encodePrefixKeyForRangeScan(...)
def decodePrefixKeyForRangeScan(...)
}
and move the logic in there so that you don't have to keep on doing avroEnc.isDefined
for these
The logic seems pretty similar except for the input data. The AvroStateSerde or whatever you want to name it would have the private lazy val remainingKeyAvroType = SchemaConverters.toAvroType(remainingKeySchema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spoke offline - it doesn't look like this simplifies things an awful lot - can be a follow-up.
virtualColFamilyId: Option[Short] = None) | ||
extends RocksDBKeyStateEncoderBase(useColumnFamilies, virtualColFamilyId) { | ||
virtualColFamilyId: Option[Short] = None, | ||
avroEnc: Option[AvroEncoder] = None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto on the Serde.
Some(newColFamilyId), avroEnc), RocksDBStateEncoder.getValueEncoder(valueSchema, | ||
useMultipleValuesPerKey, avroEnc))) | ||
} | ||
private def getAvroSerializer(schema: StructType): AvroSerializer = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: line before the method please
@@ -74,10 +75,71 @@ private[sql] class RocksDBStateStoreProvider | |||
isInternal: Boolean = false): Unit = { | |||
verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal) | |||
val newColFamilyId = rocksDB.createColFamilyIfAbsent(colFamilyName) | |||
// Create cache key using store ID to avoid collisions | |||
val avroEncCacheKey = s"${stateStoreId.operatorId}_" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have the stream runId (maybe it's available in the HadoopConf)? We should add runId, otherwise there could be collisions
// Avro encoder that is used by the RocksDBStateStoreProvider and RocksDBStateEncoder | ||
// in order to serialize from UnsafeRow to a byte array of Avro encoding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please turn this into a proper scaladoc?
/**
* ...
*/
TestWithBothChangelogCheckpointingEnabledAndDisabled ) { colFamiliesEnabled => | ||
val testSchema: StructType = StructType( | ||
Seq( | ||
StructField("ordering-1", LongType, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, why'd you have to change these? If these are not supported by Avro, do we have any check anywhere to disallow the usage of the Avro encoder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avro code would just throw an error, saying that there are invalid characters in the field name
def testWithEncodingTypes(testName: String, testTags: Tag*) | ||
(testBody: => Any): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one parameter per line like below please
...e/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
Outdated
Show resolved
Hide resolved
oh forgot - we need to add the stream run id to the Avro encoder cache key, otherwise we may risk some unintended re-use of avro encoders. we should limit the size of that cache and add expiry to it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
What changes were proposed in this pull request?
Currently, we use the internal byte representation to store state for stateful streaming operators in the StateStore. This PR introduces Avro serialization and deserialization capabilities in the RocksDBStateEncoder so that we can instead use Avro encoding to store state. This is currently enabled for the TransformWithState operator via SQLConf to support all functionality supported by TWS
Why are the changes needed?
UnsafeRow is an inherently unstable format that makes no guarantees of being backwards-compatible. Therefore, if the format changes between Spark releases, this could cause StateStore corruptions. Avro is more stable, and inherently enables schema evolution.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Amended and added to unit tests
Was this patch authored or co-authored using generative AI tooling?
No