Skip to content

Commit a4bc442

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-1669][SQL] Made cacheTable idempotent
JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669) Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table. Before: ``` scala> loadTestTable("src") ... scala> cacheTable("src") ... scala> cacheTable("src") ... scala> table("src") ... == Query Plan == InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None)))) ``` After: ``` scala> loadTestTable("src") ... scala> cacheTable("src") ... scala> cacheTable("src") ... scala> table("src") ... == Query Plan == InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None)) ``` Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1183 from liancheng/spark-1669 and squashes the following commits: 68f8a20 [Cheng Lian] Removed an unused import 51bae90 [Cheng Lian] Made cacheTable idempotent
1 parent 853a2b9 commit a4bc442

File tree

2 files changed

+29
-4
lines changed

2 files changed

+29
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
187187
/** Caches the specified table in-memory. */
188188
def cacheTable(tableName: String): Unit = {
189189
val currentTable = catalog.lookupRelation(None, tableName)
190-
val useCompression =
191-
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
192-
val asInMemoryRelation =
193-
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
190+
val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
191+
case _: InMemoryRelation =>
192+
currentTable.logicalPlan
193+
194+
case _ =>
195+
val useCompression =
196+
sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
197+
InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
198+
}
194199

195200
catalog.registerTable(None, tableName, asInMemoryRelation)
196201
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
2021
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
2123
import org.apache.spark.sql.test._
2224

2325
/* Implicits */
@@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest {
405407
clear()
406408
}
407409

410+
test("SPARK-1669: cacheTable should be idempotent") {
411+
assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
412+
413+
cacheTable("testData")
414+
EliminateAnalysisOperators(table("testData").logicalPlan) match {
415+
case _: InMemoryRelation =>
416+
case _ =>
417+
fail("testData should be cached")
418+
}
419+
420+
cacheTable("testData")
421+
EliminateAnalysisOperators(table("testData").logicalPlan) match {
422+
case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
423+
fail("cacheTable is not idempotent")
424+
425+
case _ =>
426+
}
427+
}
408428
}

0 commit comments

Comments
 (0)