Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object UnsafeRowUtils {
* @return None if all the checks pass. An error message if the row is not matched with the schema
*/
def validateStructuralIntegrityWithReason(
row: UnsafeRow, expectedSchema: StructType): Option[String] = {
row: UnsafeRow, expectedSchema: StructType): Option[String] = {
validateStructuralIntegrityWithReasonImpl(row, expectedSchema).map {
errorMessage => s"Error message is: $errorMessage, " +
s"UnsafeRow status: ${getStructuralIntegrityStatus(row, expectedSchema)}"
Expand Down Expand Up @@ -177,7 +177,8 @@ object UnsafeRowUtils {
}

def getStructuralIntegrityStatus(row: UnsafeRow, expectedSchema: StructType): String = {
val fieldStatusArr = expectedSchema.fields.zipWithIndex.map {
val minLength = Math.min(row.numFields(), expectedSchema.fields.length)
val fieldStatusArr = expectedSchema.fields.take(minLength).zipWithIndex.map {
case (field, index) =>
val offsetAndSizeStr = if (!UnsafeRow.isFixedLength(field.dataType)) {
val (offset, size) = getOffsetAndSize(row, index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,12 @@ object StateStoreProvider {
if (conf.formatValidationEnabled) {
val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
}
if (conf.formatValidationCheckValue) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }

if (conf.formatValidationCheckValue) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,31 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
}

test("SPARK-42572: StateStoreProvider.validateStateRowFormat shouldn't check" +
" value row format when SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED is false") {
// By default, when there is an invalid pair of value row and value schema, it should throw
val keyRow = dataToKeyRow("key", 1)
val valueRow = dataToValueRow(2)
val e = intercept[InvalidUnsafeRowException] {
// Here valueRow doesn't match with prefixKeySchema
StateStoreProvider.validateStateRowFormat(
keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
}
assert(e.getMessage.contains("The streaming query failed by state format invalidation"))

// When sqlConf.stateStoreFormatValidationEnabled is set to false and
// StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
// don't check value row
val sqlConf = getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
sqlConf.setConf(SQLConf.STATE_STORE_FORMAT_VALIDATION_ENABLED, false)
val storeConf = new StateStoreConf(sqlConf,
Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "true"))
// Shouldn't throw
StateStoreProvider.validateStateRowFormat(
keyRow, keySchema, valueRow, keySchema, storeConf)
}

/** Return a new provider with a random id */
def newStoreProvider(): ProviderClass

Expand Down