From bcd0881a7fca1a22016fbff0a09315a1fe8502c3 Mon Sep 17 00:00:00 2001 From: Ganesha S Date: Sat, 25 Oct 2025 19:03:59 +0530 Subject: [PATCH] [SPARK-54029][SQL] Add detailed error message for table metadata corruption to ease debugging --- .../sql/catalyst/catalog/interface.scala | 10 ++- .../catalog/SessionCatalogSuite.scala | 75 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index d824e090b45d9..64d816587f4de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -451,7 +451,15 @@ case class CatalogTable( */ def partitionSchema: StructType = { val partitionFields = schema.takeRight(partitionColumnNames.length) - assert(partitionFields.map(_.name) == partitionColumnNames) + val actualPartitionColumnNames = partitionFields.map(_.name) + + assert(actualPartitionColumnNames == partitionColumnNames, + "Corrupted table metadata detected for table " + identifier.quotedString + ". " + + "The partition column names in the table schema " + + "do not match the declared partition columns. " + + "Table schema columns: [" + schema.fieldNames.mkString(", ") + "] " + + "Declared partition columns: [" + partitionColumnNames.mkString(", ") + "]. " + + "This indicates corrupted table metadata that needs to be repaired.") StructType(partitionFields) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index b9d956c3c2a5e..300e5a8653f1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1951,4 +1951,79 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { assert(catalog.getCachedTable(qualifiedName2) != null) } } + + test("CatalogTable partitionSchema provides detailed error for corrupted metadata") { + // Test case 1: Partition columns don't match schema (wrong names) + val corruptedTable1 = CatalogTable( + identifier = TableIdentifier("corrupted_table1", Some("test_db")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = StructType(Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("year", IntegerType), + StructField("month", IntegerType) + )), + partitionColumnNames = Seq("year", "day") // "day" doesn't exist in schema + ) + + val exception1 = intercept[AssertionError] { + corruptedTable1.partitionSchema + } + + val expectedMessage1 = "assertion failed: Corrupted table metadata detected " + + "for table `test_db`.`corrupted_table1`. " + + "The partition column names in the table schema " + + "do not match the declared partition columns. " + + "Table schema columns: [id, name, year, month] " + + "Declared partition columns: [year, day]. " + + "This indicates corrupted table metadata that needs to be repaired." + assert(exception1.getMessage === expectedMessage1) + + // Test case 2: Partition columns are not at the end of schema + val corruptedTable2 = CatalogTable( + identifier = TableIdentifier("corrupted_table2", Some("test_db")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = StructType(Seq( + StructField("year", IntegerType), + StructField("id", IntegerType), + StructField("name", StringType), + StructField("month", IntegerType) + )), + partitionColumnNames = Seq("year", "month") + ) + + val exception2 = intercept[AssertionError] { + corruptedTable2.partitionSchema + } + + val expectedMessage2 = "assertion failed: Corrupted table metadata detected " + + "for table `test_db`.`corrupted_table2`. " + + "The partition column names in the table schema " + + "do not match the declared partition columns. " + + "Table schema columns: [year, id, name, month] " + + "Declared partition columns: [year, month]. " + + "This indicates corrupted table metadata that needs to be repaired." + assert(exception2.getMessage === expectedMessage2) + + // Test case 3: Valid table should work without error + val validTable = CatalogTable( + identifier = TableIdentifier("valid_table", Some("test_db")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = StructType(Seq( + StructField("id", IntegerType), + StructField("name", StringType), + StructField("year", IntegerType), + StructField("month", IntegerType) + )), + partitionColumnNames = Seq("year", "month") // Matches schema + ) + + // This should not throw an exception + val partitionSchema = validTable.partitionSchema + assert(partitionSchema.fieldNames.toSeq == Seq("year", "month")) + assert(partitionSchema.fields.length == 2) + } }