diff --git a/build.sbt b/build.sbt index 252f717fdcf..a9f01fa413b 100644 --- a/build.sbt +++ b/build.sbt @@ -79,6 +79,7 @@ lazy val commonSettings = Seq( "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), @@ -144,6 +145,7 @@ lazy val spark = (project in file("spark")) "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), @@ -201,6 +203,7 @@ lazy val contribs = (project in file("contribs")) "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), diff --git a/python/delta/testing/utils.py b/python/delta/testing/utils.py index aefb2c54c59..ab5f433c38d 100644 --- a/python/delta/testing/utils.py +++ b/python/delta/testing/utils.py @@ -37,6 +37,7 @@ def conf(cls) -> SparkConf: _conf.set("spark.databricks.delta.snapshotPartitions", "2") _conf.set("spark.sql.shuffle.partitions", "5") _conf.set("delta.log.cacheSize", "3") + _conf.set("spark.databricks.delta.delta.log.cacheSize", "3") _conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5") _conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") _conf.set("spark.sql.catalog.spark_catalog", diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index e0456ea163d..52b172853d1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources._ import org.apache.spark.sql.delta.storage.LogStoreProvider import org.apache.spark.sql.delta.util.FileNames -import com.google.common.cache.{CacheBuilder, RemovalNotification} +import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -615,21 +615,42 @@ object DeltaLog extends DeltaLogging { * We create only a single [[DeltaLog]] for any given `DeltaLogCacheKey` to avoid wasted work * in reconstructing the log. */ - private val deltaLogCache = { - val builder = CacheBuilder.newBuilder() - .expireAfterAccess(60, TimeUnit.MINUTES) - .removalListener((removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => { - val log = removalNotification.getValue - // TODO: We should use ref-counting to uncache snapshots instead of a manual timed op - try log.unsafeVolatileSnapshot.uncache() catch { - case _: java.lang.NullPointerException => - // Various layers will throw null pointer if the RDD is already gone. - } - }) - sys.props.get("delta.log.cacheSize") - .flatMap(v => Try(v.toLong).toOption) - .foreach(builder.maximumSize) - builder.build[DeltaLogCacheKey, DeltaLog]() + type CacheKey = (Path, Map[String, String]) + private[delta] def getOrCreateCache(conf: SQLConf): + Cache[CacheKey, DeltaLog] = synchronized { + deltaLogCache match { + case Some(c) => c + case None => + val builder = createCacheBuilder(conf) + .removalListener( + (removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => { + val log = removalNotification.getValue + // TODO: We should use ref-counting to uncache snapshots instead of a manual timed op + try log.unsafeVolatileSnapshot.uncache() catch { + case _: java.lang.NullPointerException => + // Various layers will throw null pointer if the RDD is already gone. + } + }) + deltaLogCache = Some(builder.build[CacheKey, DeltaLog]()) + deltaLogCache.get + } + } + + private var deltaLogCache: Option[Cache[CacheKey, DeltaLog]] = None + + /** + * Helper to create delta log caches + */ + private def createCacheBuilder(conf: SQLConf): CacheBuilder[AnyRef, AnyRef] = { + val cacheRetention = conf.getConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) + val cacheSize = conf + .getConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE) + .max(sys.props.get("delta.log.cacheSize").map(_.toLong).getOrElse(0L)) + + CacheBuilder + .newBuilder() + .expireAfterAccess(cacheRetention, TimeUnit.MINUTES) + .maximumSize(cacheSize) } @@ -787,7 +808,8 @@ object DeltaLog extends DeltaLogging { // - Different `authority` (e.g., different user tokens in the path) // - Different mount point. try { - deltaLogCache.get(path -> fileSystemOptions, () => { + getOrCreateCache(spark.sessionState.conf) + .get(path -> fileSystemOptions, () => { createDeltaLog() } ) @@ -801,7 +823,7 @@ object DeltaLog extends DeltaLogging { if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) { // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached // `DeltaLog` has been stopped. - deltaLogCache.invalidate(path -> fileSystemOptions) + getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions) getDeltaLogFromCache() } else { deltaLog @@ -819,6 +841,7 @@ object DeltaLog extends DeltaLogging { // scalastyle:on deltahadoopconfiguration val path = fs.makeQualified(rawPath) + val deltaLogCache = getOrCreateCache(spark.sessionState.conf) if (spark.sessionState.conf.getConf( DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) { // We rely on the fact that accessing the key set doesn't modify the entry access time. See @@ -841,12 +864,19 @@ object DeltaLog extends DeltaLogging { } def clearCache(): Unit = { - deltaLogCache.invalidateAll() + deltaLogCache.foreach(_.invalidateAll()) + } + + /** Unset the caches. Exposing for testing */ + private[delta] def unsetCache(): Unit = { + synchronized { + deltaLogCache = None + } } /** Return the number of cached `DeltaLog`s. Exposing for testing */ private[delta] def cacheSize: Long = { - deltaLogCache.size() + deltaLogCache.map(_.size()).getOrElse(0L) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index bba5463b3bd..c72e3138a08 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1607,6 +1607,18 @@ trait DeltaSQLConfBase { ) .createWithDefault(4) + val DELTA_LOG_CACHE_SIZE = buildConf("delta.log.cacheSize") + .internal() + .doc("The maximum number of DeltaLog instances to cache in memory.") + .longConf + .createWithDefault(10000) + + val DELTA_LOG_CACHE_RETENTION_MINUTES = buildConf("delta.log.cacheRetentionMinutes") + .internal() + .doc("The rentention duration of DeltaLog instances in the cache") + .timeConf(TimeUnit.MINUTES) + .createWithDefault(60) + ////////////////// // Delta Sharing ////////////////// diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 20aaee6011d..3edbc6249b7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -643,4 +643,34 @@ class DeltaLogSuite extends QueryTest assert(e.getMessage.contains("FAILFAST")) } } + + test("DeltaLog cache size should honor config limit") { + def assertCacheSize(expected: Long): Unit = { + for (_ <- 1 to 6) { + withTempDir(dir => { + val path = dir.getCanonicalPath + spark.range(10).write.format("delta").mode("append").save(path) + }) + } + assert(DeltaLog.cacheSize === expected) + } + DeltaLog.unsetCache() + withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "4") { + assertCacheSize(4) + DeltaLog.unsetCache() + // the larger of SQLConf and env var is adopted + try { + System.getProperties.setProperty("delta.log.cacheSize", "5") + assertCacheSize(5) + } finally { + System.getProperties.remove("delta.log.cacheSize") + } + } + + // assert timeconf returns correct value + withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES.key -> "100") { + assert(spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) === 100) + } + } }