diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e44b01bba99c..bfba5d06760b 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -387,6 +387,15 @@ def foreachPartition(self, f): """ self.rdd.foreachPartition(f) + @since(2.0) + def refresh(self): + """Refreshes the metadata and data cached in Spark for data associated with this DataFrame. + + An example use case is to invalidate the file system metadata cached by Spark, when the + underlying files have been updated by an external process. + """ + self._jdf.refresh() + @since(1.3) def cache(self): """ Persists with the default storage level (C{MEMORY_ONLY}). diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8c620d36e567..f97e0fd646ec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -462,17 +462,15 @@ class SessionCatalog( } } - // TODO: It's strange that we have both refresh and invalidate here. - /** * Refresh the cache entry for a metastore table, if any. */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Invalidate the cache entry for a metastore table, if any. - */ - def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ } + def refreshTable(name: TableIdentifier): Unit = { + // Go through temporary tables and invalidate them. + if (name.database.isEmpty) { + tempTables.get(name.table).foreach(_.refresh()) + } + } /** * Drop all existing temporary tables. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4984f235b412..c3826092b79c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { s"Reference '$name' is ambiguous, could be: $referenceNames.") } } + + /** + * Invalidates any metadata cached in the plan recursively. + */ + def refresh(): Unit = children.foreach(_.refresh()) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e64669a19c7f..25d7f763d0b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2306,6 +2306,19 @@ class Dataset[T] private[sql]( */ def distinct(): Dataset[T] = dropDuplicates() + /** + * Refreshes the metadata and data cached in Spark for data associated with this Dataset. + * An example use case is to invalidate the file system metadata cached by Spark, when the + * underlying files have been updated by an external process. + * + * @group action + * @since 2.0.0 + */ + def refresh(): Unit = { + unpersist(false) + logicalPlan.refresh() + } + /** * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`). * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fc00912bf9f5..226f61ef404a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -206,7 +206,7 @@ case class DropTableCommand( } catch { case NonFatal(e) => log.warn(e.toString, e) } - catalog.invalidateTable(tableName) + catalog.refreshTable(tableName) catalog.dropTable(tableName, ifExists) } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 687d69aa5f26..5c815df0deb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -172,7 +172,7 @@ case class AlterTableRenameCommand( } // Invalidate the table last, otherwise uncaching the table would load the logical plan // back into the hive metastore cache - catalog.invalidateTable(oldName) + catalog.refreshTable(oldName) catalog.renameTable(oldName, newName) if (wasCached) { sparkSession.catalog.cacheTable(newName.unquotedString) @@ -373,7 +373,7 @@ case class TruncateTableCommand( } // After deleting the data, invalidate the table to make sure we don't keep around a stale // file relation in the metastore cache. - spark.sessionState.invalidateTable(tableName.unquotedString) + spark.sessionState.refreshTable(tableName.unquotedString) // Also try to drop the contents of the table from the columnar cache try { spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 1443057d5c2f..9485ce64ef0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -117,7 +117,19 @@ class FileScanRDD( currentFile = files.next() logInfo(s"Reading File $currentFile") InputFileNameHolder.setInputFileName(currentFile.filePath) - currentIterator = readFunction(currentFile) + + try { + currentIterator = readFunction(currentFile) + } catch { + case e: java.io.FileNotFoundException => + throw new java.io.FileNotFoundException( + e.getMessage + "\n" + + "It is possible the underlying files have been updated. " + + "You can explicitly invalidate the cache in Spark by " + + "running 'REFRESH TABLE tableName' command in SQL or " + + "by calling the refresh() function on a Dataset/DataFrame." + ) + } hasNext } else { currentFile = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 39c8606fd14b..90711f2b1dde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -85,5 +85,10 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] + override def refresh(): Unit = relation match { + case fs: HadoopFsRelation => fs.refresh() + case _ => // Do nothing. + } + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 5f5cf5c6d30c..01cc13f9df88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) - def invalidateTable(tableName: String): Unit = { - catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) + def refreshTable(tableName: String): Unit = { + catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) } def addJar(path: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala new file mode 100644 index 000000000000..5bcfd9db2a8c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.File + +import org.apache.spark.SparkException +import org.apache.spark.sql.test.SharedSQLContext + +class MetadataCacheSuite extends QueryTest with SharedSQLContext { + + /** Removes one data file in the given directory. */ + private def deleteOneFileInDirectory(dir: File): Unit = { + assert(dir.isDirectory) + val oneFile = dir.listFiles().find { file => + !file.getName.startsWith("_") && !file.getName.startsWith(".") + } + assert(oneFile.isDefined) + oneFile.foreach(_.delete()) + } + + test("Dataset.refresh()") { + withTempPath { (location: File) => + // Create a Parquet directory + spark.range(start = 0, end = 100, step = 1, numPartitions = 3) + .write.parquet(location.getAbsolutePath) + + // Read the directory in + val df = spark.read.parquet(location.getAbsolutePath) + assert(df.count() == 100) + + // Delete a file + deleteOneFileInDirectory(location) + + // Read it again and now we should see a FileNotFoundException + val e = intercept[SparkException] { + df.count() + } + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("refresh()")) + + // Refresh and we should be able to read it again. + df.refresh() + assert(df.count() > 0 && df.count() < 100) + } + } + + test("temporary view refresh") { + withTempPath { (location: File) => + // Create a Parquet directory + spark.range(start = 0, end = 100, step = 1, numPartitions = 3) + .write.parquet(location.getAbsolutePath) + + // Read the directory in + spark.read.parquet(location.getAbsolutePath).createOrReplaceTempView("view_refresh") + assert(sql("select count(*) from view_refresh").first().getLong(0) == 100) + + // Delete a file + deleteOneFileInDirectory(location) + + // Read it again and now we should see a FileNotFoundException + val e = intercept[SparkException] { + sql("select count(*) from view_refresh").first() + } + assert(e.getMessage.contains("FileNotFoundException")) + assert(e.getMessage.contains("refresh()")) + + // Refresh and we should be able to read it again. + spark.catalog.refreshTable("view_refresh") + val newCount = sql("select count(*) from view_refresh").first().getLong(0) + assert(newCount > 0 && newCount < 100) + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2e0b5d59b578..9d7fa0b728bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -139,18 +139,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } def refreshTable(tableIdent: TableIdentifier): Unit = { - // refreshTable does not eagerly reload the cache. It just invalidate the cache. - // Next time when we use the table, it will be populated in the cache. - // Since we also cache ParquetRelations converted from Hive Parquet tables and - // adding converted ParquetRelations into the cache is not defined in the load function - // of the cache (instead, we add the cache entry in convertToParquetRelation), - // it is better at here to invalidate the cache to avoid confusing waring logs from the - // cache loader (e.g. cannot find data source provider, which is only defined for - // data source table.). - invalidateTable(tableIdent) - } - - def invalidateTable(tableIdent: TableIdentifier): Unit = { cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 2589b9d4a028..ea818b5ebca7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog( val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables override def refreshTable(name: TableIdentifier): Unit = { + super.refreshTable(name) metastoreCatalog.refreshTable(name) } - override def invalidateTable(name: TableIdentifier): Unit = { - metastoreCatalog.invalidateTable(name) - } - def invalidateCache(): Unit = { metastoreCatalog.cachedDataSourceTables.invalidateAll() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index b028d49aff58..12d250d4fb60 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) // Discard the cached relation. - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") checkAnswer( sql("SELECT * FROM jsonTable"), sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq) - sessionState.invalidateTable("jsonTable") + sessionState.refreshTable("jsonTable") val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil) assert(expectedSchema === table("jsonTable").schema) @@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv """.stripMargin) // Discard the cached relation. - sessionState.invalidateTable("ctasJsonTable") + sessionState.refreshTable("ctasJsonTable") // Schema should not be changed. assert(table("ctasJsonTable").schema === table("jsonTable").schema) @@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"), (6 to 10).map(i => Row(i, s"str$i"))) - sessionState.invalidateTable("savedJsonTable") + sessionState.refreshTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), @@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv options = Map("path" -> tempDir.getCanonicalPath), isExternal = false) - sessionState.invalidateTable("wide_schema") + sessionState.refreshTable("wide_schema") val actualSchema = table("wide_schema").schema assert(schema === actualSchema) @@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv sharedState.externalCatalog.createTable("default", hiveTable, ignoreIfExists = false) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val actualSchema = table(tableName).schema assert(schema === actualSchema) @@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) @@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .bucketBy(8, "d", "b") .sortBy("c") .saveAsTable(tableName) - sessionState.invalidateTable(tableName) + sessionState.refreshTable(tableName) val metastoreTable = sharedState.externalCatalog.getTable("default", tableName) val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil) val expectedSortByColumns = StructType(df.schema("c") :: Nil) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index fe7253d7354d..a78a28536cc3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -462,7 +462,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkCached(tableIdentifier) // For insert into non-partitioned table, we will do the conversion, // so the converted test_insert_parquet should be cached. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) sql( """ @@ -475,7 +475,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("select * from test_insert_parquet"), sql("select a, b from jt").collect()) // Invalidate the cache. - sessionState.invalidateTable("test_insert_parquet") + sessionState.refreshTable("test_insert_parquet") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) // Create a partitioned table. @@ -525,7 +525,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { |select b, '2015-04-02', a FROM jt """.stripMargin).collect()) - sessionState.invalidateTable("test_parquet_partitioned_cache_test") + sessionState.refreshTable("test_parquet_partitioned_cache_test") assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === null) dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")