diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ab886d5281774..9db3af1e9dd8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -170,6 +170,13 @@ class SessionCatalog( tableRelationCache.invalidate(key) } + /** This method discards any cached table relation plans for the given table identifier. */ + def invalidateCachedTable(name: TableIdentifier): Unit = { + val dbName = formatDatabaseName(name.database.getOrElse(currentDb)) + val tableName = formatTableName(name.table) + invalidateCachedTable(QualifiedTableName(dbName, tableName)) + } + /** This method provides a way to invalidate all the cached plans. */ def invalidateAllCachedTables(): Unit = { tableRelationCache.invalidateAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 87228312252ac..4512a3d03efc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -277,6 +277,7 @@ case class AlterTableSetPropertiesCommand( properties = table.properties ++ properties, comment = properties.get(TableCatalog.PROP_COMMENT).orElse(table.comment)) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } @@ -315,6 +316,7 @@ case class AlterTableUnsetPropertiesCommand( val newProperties = table.properties.filter { case (k, _) => !propKeys.contains(k) } val newTable = table.copy(properties = newProperties, comment = tableComment) catalog.alterTable(newTable) + catalog.invalidateCachedTable(tableName) Seq.empty[Row] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala index df96b0675cc2d..10d5d01bf9728 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala @@ -123,4 +123,28 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton assert(msg.contains("cannot resolve '`c3`' given input columns")) } } + + test("SPARK-37098: Alter table properties should invalidate cache") { + // specify the compression in case we change it in future + withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") { + withTempPath { dir => + withTable("t") { + sql(s"CREATE TABLE t (c int) STORED AS PARQUET LOCATION '${dir.getCanonicalPath}'") + // cache table metadata + sql("SELECT * FROM t") + sql("ALTER TABLE t SET TBLPROPERTIES('parquet.compression'='gzip')") + sql("INSERT INTO TABLE t values(1)") + val files1 = dir.listFiles().filter(_.getName.endsWith("gz.parquet")) + assert(files1.length == 1) + + // cache table metadata again + sql("SELECT * FROM t") + sql("ALTER TABLE t UNSET TBLPROPERTIES('parquet.compression')") + sql("INSERT INTO TABLE t values(1)") + val files2 = dir.listFiles().filter(_.getName.endsWith("snappy.parquet")) + assert(files2.length == 1) + } + } + } + } }