Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ object CommandUtils extends Logging {
catalog.alterTableStats(table.identifier, Some(newStats))
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
} else {
// In other cases, we still need to invalidate the table relation cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm: does catalog.alterTableStats refresh the relation cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

/**
* Alter Spark's statistics of an existing metastore table identified by the provided table
* identifier.
*/
def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = {
val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(identifier.table)
val tableIdentifier = TableIdentifier(table, Some(db))
requireDbExists(db)
requireTableExists(tableIdentifier)
externalCatalog.alterTableStats(db, table, newStats)
// Invalidate the table relation cache
refreshTable(identifier)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we need to refresh table while updating stats, please. For instance, we can do the same work twice. See:

  1. InsertIntoHiveTable:
    sparkSession.sessionState.catalog.refreshTable(table.identifier)
    CommandUtils.updateTableStats(sparkSession, table)
  2. LoadDataCommand :
    catalog.refreshTable(targetTable.identifier)
    CommandUtils.updateTableStats(sparkSession, targetTable)
  3. AlterTableDropPartitionCommand:
    sparkSession.catalog.refreshTable(table.identifier.quotedString)
    CommandUtils.updateTableStats(sparkSession, table)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all commands have refresh table logic, such as AlterTableSetLocationCommand:

CommandUtils.updateTableStats(sparkSession, table)
Seq.empty[Row]

catalog.refreshTable(table.identifier)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -191,6 +191,37 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
assert(e.message.contains("ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables"))
}
}

test("SPARK-25403 refresh the table after inserting data") {
withTable("t") {
val catalog = spark.sessionState.catalog
val table = QualifiedTableName(catalog.getCurrentDatabase, "t")
sql("CREATE TABLE t (a INT) USING parquet")
sql("INSERT INTO TABLE t VALUES (1)")
assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.")
assert(spark.table("t").count() === 1)
assert(catalog.getCachedTable(table) !== null, "Table relation should be cached.")
}
}

test("SPARK-19784 refresh the table after altering the table location") {
withTable("t") {
withTempDir { dir =>
val catalog = spark.sessionState.catalog
val table = QualifiedTableName(catalog.getCurrentDatabase, "t")
val p1 = s"${dir.getCanonicalPath}/p1"
val p2 = s"${dir.getCanonicalPath}/p2"
sql(s"CREATE TABLE t (a INT) USING parquet LOCATION '$p1'")
sql("INSERT INTO TABLE t VALUES (1)")
assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.")
spark.range(5).toDF("a").write.parquet(p2)
spark.sql(s"ALTER TABLE t SET LOCATION '$p2'")
assert(catalog.getCachedTable(table) === null, "Table relation should be invalidated.")
assert(spark.table("t").count() === 5)
assert(catalog.getCachedTable(table) !== null, "Table relation should be cached.")
}
}
}
}

abstract class DDLSuite extends QueryTest with SQLTestUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest {
def checkCached(tableIdentifier: TableIdentifier): Unit = {
// Converted test_parquet should be cached.
getCachedDataSourceTable(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case null => fail(s"Converted ${tableIdentifier.table} should be cached in the cache.")
case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK
case other =>
fail(
Expand Down Expand Up @@ -480,7 +480,7 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest {
|INSERT INTO TABLE test_insert_parquet
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifier)
assert(getCachedDataSourceTable(tableIdentifier) === null)
// Make sure we can read the data.
checkAnswer(
sql("select * from test_insert_parquet"),
Expand Down Expand Up @@ -512,14 +512,16 @@ class HiveParquetMetastoreSuite extends ParquetPartitioningTest {
|PARTITION (`date`='2015-04-01')
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifier)
// Right now, insert into a partitioned data source Parquet table. We refreshed the table.
// So, we expect it is not cached.
assert(getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (`date`='2015-04-02')
|select a, b from jt
""".stripMargin)
checkCached(tableIdentifier)
assert(getCachedDataSourceTable(tableIdentifier) === null)

// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton {

val orcPartitionedTable = TableIdentifier("dummy_orc_partitioned", Some("default"))
if (conversion == "true") {
// if converted, it's cached as a datasource table.
checkCached(orcPartitionedTable)
// if converted, we refresh the cached relation.
assert(getCachedDataSourceTable(orcPartitionedTable) === null)
} else {
// otherwise, not cached.
assert(getCachedDataSourceTable(orcPartitionedTable) === null)
Expand Down