Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,15 @@ case class CatalogTable(
*/
def partitionSchema: StructType = {
val partitionFields = schema.takeRight(partitionColumnNames.length)
assert(partitionFields.map(_.name) == partitionColumnNames)
val actualPartitionColumnNames = partitionFields.map(_.name)

assert(actualPartitionColumnNames == partitionColumnNames,
"Corrupted table metadata detected for table " + identifier.quotedString + ". " +
"The partition column names in the table schema " +
"do not match the declared partition columns. " +
"Table schema columns: [" + schema.fieldNames.mkString(", ") + "] " +
"Declared partition columns: [" + partitionColumnNames.mkString(", ") + "]. " +
"This indicates corrupted table metadata that needs to be repaired.")

StructType(partitionFields)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1951,4 +1951,79 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
assert(catalog.getCachedTable(qualifiedName2) != null)
}
}

test("CatalogTable partitionSchema provides detailed error for corrupted metadata") {
// Test case 1: Partition columns don't match schema (wrong names)
val corruptedTable1 = CatalogTable(
identifier = TableIdentifier("corrupted_table1", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = StructType(Seq(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("month", IntegerType)
)),
partitionColumnNames = Seq("year", "day") // "day" doesn't exist in schema
)

val exception1 = intercept[AssertionError] {
corruptedTable1.partitionSchema
}

val expectedMessage1 = "assertion failed: Corrupted table metadata detected " +
"for table `test_db`.`corrupted_table1`. " +
"The partition column names in the table schema " +
"do not match the declared partition columns. " +
"Table schema columns: [id, name, year, month] " +
"Declared partition columns: [year, day]. " +
"This indicates corrupted table metadata that needs to be repaired."
assert(exception1.getMessage === expectedMessage1)

// Test case 2: Partition columns are not at the end of schema
val corruptedTable2 = CatalogTable(
identifier = TableIdentifier("corrupted_table2", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = StructType(Seq(
StructField("year", IntegerType),
StructField("id", IntegerType),
StructField("name", StringType),
StructField("month", IntegerType)
)),
partitionColumnNames = Seq("year", "month")
)

val exception2 = intercept[AssertionError] {
corruptedTable2.partitionSchema
}

val expectedMessage2 = "assertion failed: Corrupted table metadata detected " +
"for table `test_db`.`corrupted_table2`. " +
"The partition column names in the table schema " +
"do not match the declared partition columns. " +
"Table schema columns: [year, id, name, month] " +
"Declared partition columns: [year, month]. " +
"This indicates corrupted table metadata that needs to be repaired."
assert(exception2.getMessage === expectedMessage2)

// Test case 3: Valid table should work without error
val validTable = CatalogTable(
identifier = TableIdentifier("valid_table", Some("test_db")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = StructType(Seq(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("month", IntegerType)
)),
partitionColumnNames = Seq("year", "month") // Matches schema
)

// This should not throw an exception
val partitionSchema = validTable.partitionSchema
assert(partitionSchema.fieldNames.toSeq == Seq("year", "month"))
assert(partitionSchema.fields.length == 2)
}
}