Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -462,17 +462,17 @@ class SessionCatalog(
}
}

// TODO: It's strange that we have both refresh and invalidate here.

/**
* Refresh the cache entry for a metastore table, if any.
*/
def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }

/**
* Invalidate the cache entry for a metastore table, if any.
*/
def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
def refreshTable(name: TableIdentifier): Unit = {
// Go through temporary tables and invalidate them.
Copy link
Member

@gatorsmile gatorsmile Jul 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test case of HiveMetadataCacheSuite.scala, users might refresh the base table by using spark.catalog.refreshTable("view_table"). Normally, they do not specify the current database name. Then, its database name is empty. Thus, this table will be treated as a temporary table. This comment might need a correction.

// If the database is defined, this is definitely not a temp table.
// If the database is not defined, there is a good chance this is a temp table.
if (name.database.isEmpty) {
tempTables.get(name.table).foreach(_.refresh())
}
}

/**
* Drop all existing temporary tables.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
s"Reference '$name' is ambiguous, could be: $referenceNames.")
}
}

/**
* Refreshes (or invalidates) any metadata/data cached in the plan recursively.
*/
def refresh(): Unit = children.foreach(_.refresh())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ case class DropTableCommand(
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
catalog.invalidateTable(tableName)
catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists)
}
Seq.empty[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ case class AlterTableRenameCommand(
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
catalog.invalidateTable(oldName)
catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
if (wasCached) {
sparkSession.catalog.cacheTable(newName.unquotedString)
Expand Down Expand Up @@ -373,7 +373,7 @@ case class TruncateTableCommand(
}
// After deleting the data, invalidate the table to make sure we don't keep around a stale
// file relation in the metastore cache.
spark.sessionState.invalidateTable(tableName.unquotedString)
spark.sessionState.refreshTable(tableName.unquotedString)
// Also try to drop the contents of the table from the columnar cache
try {
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,10 @@ case class LogicalRelation(
expectedOutputAttributes,
metastoreTableIdentifier).asInstanceOf[this.type]

override def refresh(): Unit = relation match {
case fs: HadoopFsRelation => fs.refresh()
case _ => // Do nothing.
}

override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation"
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {

def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)

def invalidateTable(tableName: String): Unit = {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
def refreshTable(tableName: String): Unit = {
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}

def addJar(path: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
}
}

ignore("SPARK-16337 temporary view refresh") {
withTempPath { (location: File) =>
test("SPARK-16337 temporary view refresh") {
withTempTable("view_refresh") { withTempPath { (location: File) =>
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.parquet(location.getAbsolutePath)
Expand All @@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with SharedSQLContext {
sql("select count(*) from view_refresh").first()
}
assert(e.getMessage.contains("FileNotFoundException"))
assert(e.getMessage.contains("refresh()"))
assert(e.getMessage.contains("REFRESH"))

// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
val newCount = sql("select count(*) from view_refresh").first().getLong(0)
assert(newCount > 0 && newCount < 100)
}
}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This style is pretty weird...

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
invalidateTable(tableIdent)
}

def invalidateTable(tableIdent: TableIdentifier): Unit = {
cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog(
val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables

override def refreshTable(name: TableIdentifier): Unit = {
super.refreshTable(name)
metastoreCatalog.refreshTable(name)
}

override def invalidateTable(name: TableIdentifier): Unit = {
metastoreCatalog.invalidateTable(name)
}

def invalidateCache(): Unit = {
metastoreCatalog.cachedDataSourceTables.invalidateAll()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils

/**
* Test suite to handle metadata cache related.
*/
class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("SPARK-16337 temporary view refresh") {
withTempTable("view_refresh") {
withTable("view_table") {
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.saveAsTable("view_table")

// Read the table in
spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)

// Delete a file using the Hadoop file system interface since the path returned by
// inputFiles is not recognizable by Java IO.
val p = new Path(spark.table("view_table").inputFiles.head)
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, false))

// Read it again and now we should see a FileNotFoundException
val e = intercept[SparkException] {
sql("select count(*) from view_refresh").first()
}
assert(e.getMessage.contains("FileNotFoundException"))
assert(e.getMessage.contains("REFRESH"))

// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
val newCount = sql("select count(*) from view_refresh").first().getLong(0)
assert(newCount > 0 && newCount < 100)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)

// Discard the cached relation.
sessionState.invalidateTable("jsonTable")
sessionState.refreshTable("jsonTable")

checkAnswer(
sql("SELECT * FROM jsonTable"),
sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)

sessionState.invalidateTable("jsonTable")
sessionState.refreshTable("jsonTable")
val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)

assert(expectedSchema === table("jsonTable").schema)
Expand Down Expand Up @@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
""".stripMargin)

// Discard the cached relation.
sessionState.invalidateTable("ctasJsonTable")
sessionState.refreshTable("ctasJsonTable")

// Schema should not be changed.
assert(table("ctasJsonTable").schema === table("jsonTable").schema)
Expand Down Expand Up @@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
(6 to 10).map(i => Row(i, s"str$i")))

sessionState.invalidateTable("savedJsonTable")
sessionState.refreshTable("savedJsonTable")

checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
Expand Down Expand Up @@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
options = Map("path" -> tempDir.getCanonicalPath),
isExternal = false)

sessionState.invalidateTable("wide_schema")
sessionState.refreshTable("wide_schema")

val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
Expand Down Expand Up @@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv

sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false)

sessionState.invalidateTable(tableName)
sessionState.refreshTable(tableName)
val actualSchema = table(tableName).schema
assert(schema === actualSchema)

Expand All @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv

withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
sessionState.invalidateTable(tableName)
sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)

Expand Down Expand Up @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.bucketBy(8, "d", "b")
.sortBy("c")
.saveAsTable(tableName)
sessionState.invalidateTable(tableName)
sessionState.refreshTable(tableName)
val metastoreTable = sharedState.externalCatalog.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
sessionState.invalidateTable("test_insert_parquet")
sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)
sql(
"""
Expand All @@ -500,7 +500,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select * from test_insert_parquet"),
sql("select a, b from jt").collect())
// Invalidate the cache.
sessionState.invalidateTable("test_insert_parquet")
sessionState.refreshTable("test_insert_parquet")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)

// Create a partitioned table.
Expand Down Expand Up @@ -550,7 +550,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|select b, '2015-04-02', a FROM jt
""".stripMargin).collect())

sessionState.invalidateTable("test_parquet_partitioned_cache_test")
sessionState.refreshTable("test_parquet_partitioned_cache_test")
assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null)

dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
Expand Down