From 6b884fb2559019286ef1f07c3210ada63e964212 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Wed, 24 Jan 2024 14:26:06 -0800 Subject: [PATCH 1/7] impl --- .../org/apache/spark/sql/delta/DeltaLog.scala | 72 +++++++++++++------ .../sql/delta/sources/DeltaSQLConf.scala | 12 ++++ .../spark/sql/delta/DeltaLogSuite.scala | 18 +++++ 3 files changed, 82 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index e0456ea163d..fa8612defe7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources._ import org.apache.spark.sql.delta.storage.LogStoreProvider import org.apache.spark.sql.delta.util.FileNames -import com.google.common.cache.{CacheBuilder, RemovalNotification} +import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -615,21 +615,42 @@ object DeltaLog extends DeltaLogging { * We create only a single [[DeltaLog]] for any given `DeltaLogCacheKey` to avoid wasted work * in reconstructing the log. */ - private val deltaLogCache = { - val builder = CacheBuilder.newBuilder() - .expireAfterAccess(60, TimeUnit.MINUTES) - .removalListener((removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => { - val log = removalNotification.getValue - // TODO: We should use ref-counting to uncache snapshots instead of a manual timed op - try log.unsafeVolatileSnapshot.uncache() catch { - case _: java.lang.NullPointerException => - // Various layers will throw null pointer if the RDD is already gone. - } - }) - sys.props.get("delta.log.cacheSize") - .flatMap(v => Try(v.toLong).toOption) - .foreach(builder.maximumSize) - builder.build[DeltaLogCacheKey, DeltaLog]() + type CacheKey = (Path, Map[String, String]) + private[delta] def getOrCreateCache(conf: SQLConf): + Cache[CacheKey, DeltaLog] = synchronized { + deltaLogCache match { + case Some(c) => c + case None => + val builder = createCacheBuilder(conf) + .removalListener( + (removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => { + val log = removalNotification.getValue + // TODO: We should use ref-counting to uncache snapshots instead of a manual timed op + try log.unsafeVolatileSnapshot.uncache() catch { + case _: java.lang.NullPointerException => + // Various layers will throw null pointer if the RDD is already gone. + } + }) + deltaLogCache = Some(builder.build[CacheKey, DeltaLog]()) + deltaLogCache.get + } + } + + private var deltaLogCache: Option[Cache[CacheKey, DeltaLog]] = None + + /** + * Helper to create delta log caches + */ + private[delta] def createCacheBuilder(conf: SQLConf): CacheBuilder[AnyRef, AnyRef] = { + val cacheRetention = conf.getConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) + val cacheSize = conf + .getConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE) + .max(sys.props.get("delta.log.cacheSize").map(_.toLong).getOrElse(0L)) + + CacheBuilder + .newBuilder() + .expireAfterAccess(cacheRetention, TimeUnit.MINUTES) + .maximumSize(cacheSize) } @@ -787,7 +808,8 @@ object DeltaLog extends DeltaLogging { // - Different `authority` (e.g., different user tokens in the path) // - Different mount point. try { - deltaLogCache.get(path -> fileSystemOptions, () => { + getOrCreateCache(spark.sessionState.conf).get( + path -> fileSystemOptions, () => { createDeltaLog() } ) @@ -801,7 +823,7 @@ object DeltaLog extends DeltaLogging { if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) { // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached // `DeltaLog` has been stopped. - deltaLogCache.invalidate(path -> fileSystemOptions) + getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions) getDeltaLogFromCache() } else { deltaLog @@ -819,6 +841,7 @@ object DeltaLog extends DeltaLogging { // scalastyle:on deltahadoopconfiguration val path = fs.makeQualified(rawPath) + val deltaLogCache = getOrCreateCache(spark.sessionState.conf) if (spark.sessionState.conf.getConf( DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) { // We rely on the fact that accessing the key set doesn't modify the entry access time. See @@ -841,12 +864,21 @@ object DeltaLog extends DeltaLogging { } def clearCache(): Unit = { - deltaLogCache.invalidateAll() + deltaLogCache.foreach { cache => + cache.invalidateAll() + } + } + + /** Unset the caches. Exposing for testing */ + private[delta] def unsetCache(): Unit = { + synchronized { + deltaLogCache = None + } } /** Return the number of cached `DeltaLog`s. Exposing for testing */ private[delta] def cacheSize: Long = { - deltaLogCache.size() + deltaLogCache.map(_.size()).getOrElse(0L) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index bba5463b3bd..c72e3138a08 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1607,6 +1607,18 @@ trait DeltaSQLConfBase { ) .createWithDefault(4) + val DELTA_LOG_CACHE_SIZE = buildConf("delta.log.cacheSize") + .internal() + .doc("The maximum number of DeltaLog instances to cache in memory.") + .longConf + .createWithDefault(10000) + + val DELTA_LOG_CACHE_RETENTION_MINUTES = buildConf("delta.log.cacheRetentionMinutes") + .internal() + .doc("The rentention duration of DeltaLog instances in the cache") + .timeConf(TimeUnit.MINUTES) + .createWithDefault(60) + ////////////////// // Delta Sharing ////////////////// diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 20aaee6011d..f1f47c16e3e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -643,4 +643,22 @@ class DeltaLogSuite extends QueryTest assert(e.getMessage.contains("FAILFAST")) } } + + test("DeltaLog cache size should honor config limit") { + def assertCacheSize(expected: Long): Unit = { + for (_ <- 1 to 5) { + withTempDir(dir => { + val path = dir.getCanonicalPath + spark.range(10).write.format("delta").mode("append").save(path) + }) + } + assert(DeltaLog.cacheSize === expected) + } + DeltaLog.unsetCache() + spark.sessionState.conf.setConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE, 2L) + assertCacheSize(2) + DeltaLog.unsetCache() + System.getProperties.setProperty("delta.log.cacheSize", "3") + assertCacheSize(3) + } } From b9c1802c3c4c492f96d7ee18c853933fd2a6b60a Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Thu, 15 Feb 2024 10:16:23 -0800 Subject: [PATCH 2/7] update --- .../org/apache/spark/sql/delta/DeltaLog.scala | 10 ++++----- .../spark/sql/delta/DeltaLogSuite.scala | 22 ++++++++++++++----- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index fa8612defe7..52b172853d1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -641,7 +641,7 @@ object DeltaLog extends DeltaLogging { /** * Helper to create delta log caches */ - private[delta] def createCacheBuilder(conf: SQLConf): CacheBuilder[AnyRef, AnyRef] = { + private def createCacheBuilder(conf: SQLConf): CacheBuilder[AnyRef, AnyRef] = { val cacheRetention = conf.getConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) val cacheSize = conf .getConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE) @@ -808,8 +808,8 @@ object DeltaLog extends DeltaLogging { // - Different `authority` (e.g., different user tokens in the path) // - Different mount point. try { - getOrCreateCache(spark.sessionState.conf).get( - path -> fileSystemOptions, () => { + getOrCreateCache(spark.sessionState.conf) + .get(path -> fileSystemOptions, () => { createDeltaLog() } ) @@ -864,9 +864,7 @@ object DeltaLog extends DeltaLogging { } def clearCache(): Unit = { - deltaLogCache.foreach { cache => - cache.invalidateAll() - } + deltaLogCache.foreach(_.invalidateAll()) } /** Unset the caches. Exposing for testing */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index f1f47c16e3e..94674786d4c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -655,10 +655,22 @@ class DeltaLogSuite extends QueryTest assert(DeltaLog.cacheSize === expected) } DeltaLog.unsetCache() - spark.sessionState.conf.setConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE, 2L) - assertCacheSize(2) - DeltaLog.unsetCache() - System.getProperties.setProperty("delta.log.cacheSize", "3") - assertCacheSize(3) + withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "2") { + assertCacheSize(2) + DeltaLog.unsetCache() + // the larger of SQLConf and env var is adopted + try { + System.getProperties.setProperty("delta.log.cacheSize", "3") + assertCacheSize(3) + } finally { + System.getProperties.remove("delta.log.cacheSize") + } + } + + // assert timeconf returns correct value + withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES.key -> "100") { + assert(spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) === 100) + } } } From e1ce6dc089bc3b6da27b2a90ef5d4098eff0dc56 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 16 Feb 2024 10:45:43 -0800 Subject: [PATCH 3/7] test tweak heap size --- run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run-tests.py b/run-tests.py index 0fb48a18bf2..de5b1b23ade 100755 --- a/run-tests.py +++ b/run-tests.py @@ -80,7 +80,7 @@ def run_sbt_tests(root_dir, test_group, coverage, scala_version=None): # a GC that is optimized for larger multiprocessor machines with large memory cmd += ["-J-XX:+UseG1GC"] # 6x the default heap size (set in delta/built.sbt) - cmd += ["-J-Xmx6G"] + cmd += ["-J-Xmx12G"] run_cmd(cmd, stream_output=True) def run_python_tests(root_dir): From 0363af222b4d02a874e17f06998727f5738acc7b Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 16 Feb 2024 12:55:02 -0800 Subject: [PATCH 4/7] Revert "test tweak heap size" This reverts commit e1ce6dc089bc3b6da27b2a90ef5d4098eff0dc56. --- run-tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run-tests.py b/run-tests.py index de5b1b23ade..0fb48a18bf2 100755 --- a/run-tests.py +++ b/run-tests.py @@ -80,7 +80,7 @@ def run_sbt_tests(root_dir, test_group, coverage, scala_version=None): # a GC that is optimized for larger multiprocessor machines with large memory cmd += ["-J-XX:+UseG1GC"] # 6x the default heap size (set in delta/built.sbt) - cmd += ["-J-Xmx12G"] + cmd += ["-J-Xmx6G"] run_cmd(cmd, stream_output=True) def run_python_tests(root_dir): From b33dbb1a93e0c1c53c3cb20597770a72f33a88ab Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Fri, 16 Feb 2024 12:59:38 -0800 Subject: [PATCH 5/7] fix UT --- python/delta/testing/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/delta/testing/utils.py b/python/delta/testing/utils.py index aefb2c54c59..ab5f433c38d 100644 --- a/python/delta/testing/utils.py +++ b/python/delta/testing/utils.py @@ -37,6 +37,7 @@ def conf(cls) -> SparkConf: _conf.set("spark.databricks.delta.snapshotPartitions", "2") _conf.set("spark.sql.shuffle.partitions", "5") _conf.set("delta.log.cacheSize", "3") + _conf.set("spark.databricks.delta.delta.log.cacheSize", "3") _conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5") _conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") _conf.set("spark.sql.catalog.spark_catalog", From 735d7c829e854abcf047bae9c091b44894d1d0e7 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Wed, 21 Feb 2024 13:45:19 -0800 Subject: [PATCH 6/7] tweak UT --- build.sbt | 3 +++ .../scala/org/apache/spark/sql/delta/DeltaLogSuite.scala | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index 252f717fdcf..a9f01fa413b 100644 --- a/build.sbt +++ b/build.sbt @@ -79,6 +79,7 @@ lazy val commonSettings = Seq( "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), @@ -144,6 +145,7 @@ lazy val spark = (project in file("spark")) "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), @@ -201,6 +203,7 @@ lazy val contribs = (project in file("contribs")) "-Dspark.databricks.delta.snapshotPartitions=2", "-Dspark.sql.shuffle.partitions=5", "-Ddelta.log.cacheSize=3", + "-Dspark.databricks.delta.delta.log.cacheSize=3", "-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5", "-Xmx1024m" ), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 94674786d4c..0554610ccaf 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -646,7 +646,7 @@ class DeltaLogSuite extends QueryTest test("DeltaLog cache size should honor config limit") { def assertCacheSize(expected: Long): Unit = { - for (_ <- 1 to 5) { + for (_ <- 1 to 6) { withTempDir(dir => { val path = dir.getCanonicalPath spark.range(10).write.format("delta").mode("append").save(path) @@ -655,12 +655,12 @@ class DeltaLogSuite extends QueryTest assert(DeltaLog.cacheSize === expected) } DeltaLog.unsetCache() - withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "2") { + withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "4") { assertCacheSize(2) DeltaLog.unsetCache() // the larger of SQLConf and env var is adopted try { - System.getProperties.setProperty("delta.log.cacheSize", "3") + System.getProperties.setProperty("delta.log.cacheSize", "5") assertCacheSize(3) } finally { System.getProperties.remove("delta.log.cacheSize") From ef718e81b40a6406cbbffeef09cc87c768c92964 Mon Sep 17 00:00:00 2001 From: Fred Storage Liu Date: Wed, 21 Feb 2024 14:13:05 -0800 Subject: [PATCH 7/7] tweak UT --- .../test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 0554610ccaf..3edbc6249b7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -656,12 +656,12 @@ class DeltaLogSuite extends QueryTest } DeltaLog.unsetCache() withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "4") { - assertCacheSize(2) + assertCacheSize(4) DeltaLog.unsetCache() // the larger of SQLConf and env var is adopted try { System.getProperties.setProperty("delta.log.cacheSize", "5") - assertCacheSize(3) + assertCacheSize(5) } finally { System.getProperties.remove("delta.log.cacheSize") }