diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6ab3a615e6cc..f7487ec6c9bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -126,6 +126,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CACHE_STORAGE_LEVEL = buildConf("spark.sql.inMemoryColumnarStorage.level") + .internal() + .doc("Configures the storage level of the cached tables.") + .stringConf + .createWithDefault("MEMORY_AND_DISK") + val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize") .internal() .doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " + @@ -930,6 +936,8 @@ class SQLConf extends Serializable with Logging { def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) + def cacheStorageLevel: String = getConf(CACHE_STORAGE_LEVEL) + def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) def targetPostShuffleInputSize: Long = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 0ea806d6cb50..406683187ee5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.SparkSession import org.apache.spark.storage.StorageLevel -import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK /** Holds a cached logical plan and its data */ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) @@ -88,8 +87,8 @@ class CacheManager extends Logging { */ def cacheQuery( query: Dataset[_], - tableName: Option[String] = None, - storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { + tableName: Option[String], + storageLevel: StorageLevel): Unit = writeLock { val planToCache = query.logicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") @@ -106,6 +105,11 @@ class CacheManager extends Logging { } } + def cacheQuery(query: Dataset[_], tableName: Option[String] = None): Unit = writeLock { + cacheQuery(query, tableName, StorageLevel.fromString( + query.sparkSession.sessionState.conf.cacheStorageLevel)) + } + /** * Un-cache all the cache entries that refer to the given plan. */