diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index bfdeecedd94a..33bb228dd961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -50,6 +50,9 @@ object CommandUtils extends Logging { catalog.alterTableStats(table.identifier, Some(newStats)) } else if (table.stats.nonEmpty) { catalog.alterTableStats(table.identifier, None) + } else { + // In other cases, we still need to invalidate the table relation cache. + catalog.refreshTable(table.identifier) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 2bb121b27e7d..cc12b18d77af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -191,6 +191,37 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { assert(e.message.contains("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables")) } } + + test("SPARK-25403 refresh the table after inserting data") { + withTable("t") { + val catalog = spark.sessionState.catalog + val table = QualifiedTableName(catalog.getCurrentDatabase, "t") + sql("CREATE TABLE t (a INT) USING parquet") + sql("INSERT INTO TABLE t VALUES (1)") + assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.") + assert(spark.table("t").count() === 1) + assert(catalog.getCachedTable(table) !== null, "Table relation should be cached.") + } + } + + test("SPARK-19784 refresh the table after altering the table location") { + withTable("t") { + withTempDir { dir => + val catalog = spark.sessionState.catalog + val table = QualifiedTableName(catalog.getCurrentDatabase, "t") + val p1 = s"${dir.getCanonicalPath}/p1" + val p2 = s"${dir.getCanonicalPath}/p2" + sql(s"CREATE TABLE t (a INT) USING parquet LOCATION '$p1'") + sql("INSERT INTO TABLE t VALUES (1)") + assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.") + spark.range(5).toDF("a").write.parquet(p2) + spark.sql(s"ALTER TABLE t SET LOCATION '$p2'") + assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.") + assert(spark.table("t").count() === 5) + assert(catalog.getCachedTable(table) !== null, "Table relation should be cached.") + } + } + } } abstract class DDLSuite extends QueryTest with SQLTestUtils { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala index 5f3705d07bca..0bdaa0c23c53 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala @@ -440,7 +440,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { def checkCached(tableIdentifier: TableIdentifier): Unit = { // Converted test_parquet should be cached. getCachedDataSourceTable(tableIdentifier) match { - case null => fail("Converted test_parquet should be cached in the cache.") + case null => fail(s"Converted ${tableIdentifier.table} should be cached in the cache.") case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK case other => fail( @@ -480,7 +480,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { |INSERT INTO TABLE test_insert_parquet |select a, b from jt """.stripMargin) - checkCached(tableIdentifier) + assert(getCachedDataSourceTable(tableIdentifier) === null) // Make sure we can read the data. checkAnswer( sql("select * from test_insert_parquet"), @@ -512,14 +512,16 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest { |PARTITION (`date`='2015-04-01') |select a, b from jt """.stripMargin) - checkCached(tableIdentifier) + // Right now, insert into a partitioned data source Parquet table. We refreshed the table. + // So, we expect it is not cached. + assert(getCachedDataSourceTable(tableIdentifier) === null) sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test |PARTITION (`date`='2015-04-02') |select a, b from jt """.stripMargin) - checkCached(tableIdentifier) + assert(getCachedDataSourceTable(tableIdentifier) === null) // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index 3c545c577f16..990d9425fb7f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -274,8 +274,8 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { val orcPartitionedTable = TableIdentifier("dummy_orc_partitioned", Some("default")) if (conversion == "true") { - // if converted, it's cached as a datasource table. - checkCached(orcPartitionedTable) + // if converted, we refresh the cached relation. + assert(getCachedDataSourceTable(orcPartitionedTable) === null) } else { // otherwise, not cached. assert(getCachedDataSourceTable(orcPartitionedTable) === null)