diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala index 20625e10f321..0c8cabb75ed6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala @@ -72,7 +72,7 @@ class StateSchemaCompatibilityChecker( } private def schemasCompatible(storedSchema: StructType, schema: StructType): Boolean = - DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema) + DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema) // Visible for testing private[sql] def readSchemaFile(): (StructType, StructType) = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala index a9cc90ca45ce..153934135933 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala @@ -63,6 +63,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { private val valueSchema65535Bytes = new StructType() .add(StructField("v" * (65535 - 87), IntegerType, nullable = true)) + // Checks on adding/removing (nested) field. + test("adding field to key should fail") { val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType)) verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema) @@ -107,6 +109,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { verifyException(keySchema, valueSchema, keySchema, newValueSchema) } + // Checks on changing type of (nested) field. + test("changing the type of field in key should fail") { val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType))) verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema) @@ -129,28 +133,59 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession { verifyException(keySchema, valueSchema, keySchema, newValueSchema) } - test("changing the nullability of nullable to non-nullable in key should fail") { + // Checks on changing nullability of (nested) field. + // Note that these tests have different format of the test name compared to others, since it was + // misleading to understand the assignment as the opposite way. + + test("storing non-nullable column into nullable column in key should be allowed") { val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false))) - verifyException(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema) + verifySuccess(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema) } - test("changing the nullability of nullable to non-nullable in value should fail") { + test("storing non-nullable column into nullable column in value schema should be allowed") { val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false))) - verifyException(keySchema, valueSchema, keySchema, nonNullChangedValueSchema) + verifySuccess(keySchema, valueSchema, keySchema, nonNullChangedValueSchema) } - test("changing the nullability of nullable to nonnullable in nested field in key should fail") { + test("storing non-nullable into nullable in nested field in key should be allowed") { val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false))) val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema) - verifyException(keySchema, valueSchema, newKeySchema, valueSchema) + verifySuccess(keySchema, valueSchema, newKeySchema, valueSchema) } - test("changing the nullability of nullable to nonnullable in nested field in value should fail") { + test("storing non-nullable into nullable in nested field in value should be allowed") { val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false))) val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema) - verifyException(keySchema, valueSchema, keySchema, newValueSchema) + verifySuccess(keySchema, valueSchema, keySchema, newValueSchema) + } + + test("storing nullable column into non-nullable column in key should fail") { + val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false))) + verifyException(nonNullChangedKeySchema, valueSchema, keySchema, valueSchema) + } + + test("storing nullable column into non-nullable column in value schema should fail") { + val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false))) + verifyException(keySchema, nonNullChangedValueSchema, keySchema, valueSchema) + } + + test("storing nullable column into non-nullable column in nested field in key should fail") { + val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false))) + val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema) + verifyException(newKeySchema, valueSchema, keySchema, valueSchema) } + test("storing nullable column into non-nullable column in nested field in value should fail") { + val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false))) + val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema) + verifyException(keySchema, newValueSchema, keySchema, valueSchema) + } + + // Checks on changing name of (nested) field. + // Changing the name is allowed since it may be possible Spark can make relevant changes from + // operators/functions by chance. This opens a risk that end users swap two fields having same + // data type, but there is no way to address both. + test("changing the name of field in key should be allowed") { val newName: StructField => StructField = f => f.copy(name = f.name + "_new") val fieldNameChangedKeySchema = StructType(keySchema.map(newName))