Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class StateSchemaCompatibilityChecker(
}

private def schemasCompatible(storedSchema: StructType, schema: StructType): Boolean =
DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema)
DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema)

// Visible for testing
private[sql] def readSchemaFile(): (StructType, StructType) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
private val valueSchema65535Bytes = new StructType()
.add(StructField("v" * (65535 - 87), IntegerType, nullable = true))

// Checks on adding/removing (nested) field.

test("adding field to key should fail") {
val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType))
verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema)
Expand Down Expand Up @@ -107,6 +109,8 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
verifyException(keySchema, valueSchema, keySchema, newValueSchema)
}

// Checks on changing type of (nested) field.

test("changing the type of field in key should fail") {
val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = TimestampType)))
verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema)
Expand All @@ -129,28 +133,59 @@ class StateSchemaCompatibilityCheckerSuite extends SharedSparkSession {
verifyException(keySchema, valueSchema, keySchema, newValueSchema)
}

test("changing the nullability of nullable to non-nullable in key should fail") {
// Checks on changing nullability of (nested) field.
// Note that these tests have different format of the test name compared to others, since it was
// misleading to understand the assignment as the opposite way.

test("storing non-nullable column into nullable column in key should be allowed") {
val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false)))
verifyException(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema)
verifySuccess(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema)
}

test("changing the nullability of nullable to non-nullable in value should fail") {
test("storing non-nullable column into nullable column in value schema should be allowed") {
val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false)))
verifyException(keySchema, valueSchema, keySchema, nonNullChangedValueSchema)
verifySuccess(keySchema, valueSchema, keySchema, nonNullChangedValueSchema)
}

test("changing the nullability of nullable to nonnullable in nested field in key should fail") {
test("storing non-nullable into nullable in nested field in key should be allowed") {
val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
verifyException(keySchema, valueSchema, newKeySchema, valueSchema)
verifySuccess(keySchema, valueSchema, newKeySchema, valueSchema)
}

test("changing the nullability of nullable to nonnullable in nested field in value should fail") {
test("storing non-nullable into nullable in nested field in value should be allowed") {
val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
verifyException(keySchema, valueSchema, keySchema, newValueSchema)
verifySuccess(keySchema, valueSchema, keySchema, newValueSchema)
}

test("storing nullable column into non-nullable column in key should fail") {
val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = false)))
verifyException(nonNullChangedKeySchema, valueSchema, keySchema, valueSchema)
}

test("storing nullable column into non-nullable column in value schema should fail") {
val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable = false)))
verifyException(keySchema, nonNullChangedValueSchema, keySchema, valueSchema)
}

test("storing nullable column into non-nullable column in nested field in key should fail") {
val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
val newKeySchema = applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
verifyException(newKeySchema, valueSchema, keySchema, valueSchema)
}

test("storing nullable column into non-nullable column in nested field in value should fail") {
val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable = false)))
val newValueSchema = applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
verifyException(keySchema, newValueSchema, keySchema, valueSchema)
}

// Checks on changing name of (nested) field.
// Changing the name is allowed since it may be possible Spark can make relevant changes from
// operators/functions by chance. This opens a risk that end users swap two fields having same
// data type, but there is no way to address both.

test("changing the name of field in key should be allowed") {
val newName: StructField => StructField = f => f.copy(name = f.name + "_new")
val fieldNameChangedKeySchema = StructType(keySchema.map(newName))
Expand Down