Skip to content

Commit 3f0496b

Browse files
lzlfredvkorukanti
authored andcommitted
Make a Delta SQL conf for DeltaLog cache size
Make a Delta SQL conf for DeltaLog cache size Closes #2568 GitOrigin-RevId: 2f5b0992afe7aba5586a5e0e083c782e8dab40e5
1 parent 210503a commit 3f0496b

File tree

5 files changed

+96
-20
lines changed

5 files changed

+96
-20
lines changed

build.sbt

+3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ lazy val commonSettings = Seq(
7979
"-Dspark.databricks.delta.snapshotPartitions=2",
8080
"-Dspark.sql.shuffle.partitions=5",
8181
"-Ddelta.log.cacheSize=3",
82+
"-Dspark.databricks.delta.delta.log.cacheSize=3",
8283
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
8384
"-Xmx1024m"
8485
),
@@ -144,6 +145,7 @@ lazy val spark = (project in file("spark"))
144145
"-Dspark.databricks.delta.snapshotPartitions=2",
145146
"-Dspark.sql.shuffle.partitions=5",
146147
"-Ddelta.log.cacheSize=3",
148+
"-Dspark.databricks.delta.delta.log.cacheSize=3",
147149
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
148150
"-Xmx1024m"
149151
),
@@ -201,6 +203,7 @@ lazy val contribs = (project in file("contribs"))
201203
"-Dspark.databricks.delta.snapshotPartitions=2",
202204
"-Dspark.sql.shuffle.partitions=5",
203205
"-Ddelta.log.cacheSize=3",
206+
"-Dspark.databricks.delta.delta.log.cacheSize=3",
204207
"-Dspark.sql.sources.parallelPartitionDiscovery.parallelism=5",
205208
"-Xmx1024m"
206209
),

python/delta/testing/utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ def conf(cls) -> SparkConf:
3737
_conf.set("spark.databricks.delta.snapshotPartitions", "2")
3838
_conf.set("spark.sql.shuffle.partitions", "5")
3939
_conf.set("delta.log.cacheSize", "3")
40+
_conf.set("spark.databricks.delta.delta.log.cacheSize", "3")
4041
_conf.set("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
4142
_conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
4243
_conf.set("spark.sql.catalog.spark_catalog",

spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala

+50-20
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
3535
import org.apache.spark.sql.delta.sources._
3636
import org.apache.spark.sql.delta.storage.LogStoreProvider
3737
import org.apache.spark.sql.delta.util.FileNames
38-
import com.google.common.cache.{CacheBuilder, RemovalNotification}
38+
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
3939
import org.apache.hadoop.conf.Configuration
4040
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
4141

@@ -615,21 +615,42 @@ object DeltaLog extends DeltaLogging {
615615
* We create only a single [[DeltaLog]] for any given `DeltaLogCacheKey` to avoid wasted work
616616
* in reconstructing the log.
617617
*/
618-
private val deltaLogCache = {
619-
val builder = CacheBuilder.newBuilder()
620-
.expireAfterAccess(60, TimeUnit.MINUTES)
621-
.removalListener((removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => {
622-
val log = removalNotification.getValue
623-
// TODO: We should use ref-counting to uncache snapshots instead of a manual timed op
624-
try log.unsafeVolatileSnapshot.uncache() catch {
625-
case _: java.lang.NullPointerException =>
626-
// Various layers will throw null pointer if the RDD is already gone.
627-
}
628-
})
629-
sys.props.get("delta.log.cacheSize")
630-
.flatMap(v => Try(v.toLong).toOption)
631-
.foreach(builder.maximumSize)
632-
builder.build[DeltaLogCacheKey, DeltaLog]()
618+
type CacheKey = (Path, Map[String, String])
619+
private[delta] def getOrCreateCache(conf: SQLConf):
620+
Cache[CacheKey, DeltaLog] = synchronized {
621+
deltaLogCache match {
622+
case Some(c) => c
623+
case None =>
624+
val builder = createCacheBuilder(conf)
625+
.removalListener(
626+
(removalNotification: RemovalNotification[DeltaLogCacheKey, DeltaLog]) => {
627+
val log = removalNotification.getValue
628+
// TODO: We should use ref-counting to uncache snapshots instead of a manual timed op
629+
try log.unsafeVolatileSnapshot.uncache() catch {
630+
case _: java.lang.NullPointerException =>
631+
// Various layers will throw null pointer if the RDD is already gone.
632+
}
633+
})
634+
deltaLogCache = Some(builder.build[CacheKey, DeltaLog]())
635+
deltaLogCache.get
636+
}
637+
}
638+
639+
private var deltaLogCache: Option[Cache[CacheKey, DeltaLog]] = None
640+
641+
/**
642+
* Helper to create delta log caches
643+
*/
644+
private def createCacheBuilder(conf: SQLConf): CacheBuilder[AnyRef, AnyRef] = {
645+
val cacheRetention = conf.getConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES)
646+
val cacheSize = conf
647+
.getConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE)
648+
.max(sys.props.get("delta.log.cacheSize").map(_.toLong).getOrElse(0L))
649+
650+
CacheBuilder
651+
.newBuilder()
652+
.expireAfterAccess(cacheRetention, TimeUnit.MINUTES)
653+
.maximumSize(cacheSize)
633654
}
634655

635656

@@ -787,7 +808,8 @@ object DeltaLog extends DeltaLogging {
787808
// - Different `authority` (e.g., different user tokens in the path)
788809
// - Different mount point.
789810
try {
790-
deltaLogCache.get(path -> fileSystemOptions, () => {
811+
getOrCreateCache(spark.sessionState.conf)
812+
.get(path -> fileSystemOptions, () => {
791813
createDeltaLog()
792814
}
793815
)
@@ -801,7 +823,7 @@ object DeltaLog extends DeltaLogging {
801823
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
802824
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
803825
// `DeltaLog` has been stopped.
804-
deltaLogCache.invalidate(path -> fileSystemOptions)
826+
getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions)
805827
getDeltaLogFromCache()
806828
} else {
807829
deltaLog
@@ -819,6 +841,7 @@ object DeltaLog extends DeltaLogging {
819841
// scalastyle:on deltahadoopconfiguration
820842
val path = fs.makeQualified(rawPath)
821843

844+
val deltaLogCache = getOrCreateCache(spark.sessionState.conf)
822845
if (spark.sessionState.conf.getConf(
823846
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
824847
// We rely on the fact that accessing the key set doesn't modify the entry access time. See
@@ -841,12 +864,19 @@ object DeltaLog extends DeltaLogging {
841864
}
842865

843866
def clearCache(): Unit = {
844-
deltaLogCache.invalidateAll()
867+
deltaLogCache.foreach(_.invalidateAll())
868+
}
869+
870+
/** Unset the caches. Exposing for testing */
871+
private[delta] def unsetCache(): Unit = {
872+
synchronized {
873+
deltaLogCache = None
874+
}
845875
}
846876

847877
/** Return the number of cached `DeltaLog`s. Exposing for testing */
848878
private[delta] def cacheSize: Long = {
849-
deltaLogCache.size()
879+
deltaLogCache.map(_.size()).getOrElse(0L)
850880
}
851881

852882
/**

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+12
Original file line numberDiff line numberDiff line change
@@ -1618,6 +1618,18 @@ trait DeltaSQLConfBase {
16181618
)
16191619
.createWithDefault(4)
16201620

1621+
val DELTA_LOG_CACHE_SIZE = buildConf("delta.log.cacheSize")
1622+
.internal()
1623+
.doc("The maximum number of DeltaLog instances to cache in memory.")
1624+
.longConf
1625+
.createWithDefault(10000)
1626+
1627+
val DELTA_LOG_CACHE_RETENTION_MINUTES = buildConf("delta.log.cacheRetentionMinutes")
1628+
.internal()
1629+
.doc("The rentention duration of DeltaLog instances in the cache")
1630+
.timeConf(TimeUnit.MINUTES)
1631+
.createWithDefault(60)
1632+
16211633
//////////////////
16221634
// Delta Sharing
16231635
//////////////////

spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala

+30
Original file line numberDiff line numberDiff line change
@@ -643,4 +643,34 @@ class DeltaLogSuite extends QueryTest
643643
assert(e.getMessage.contains("FAILFAST"))
644644
}
645645
}
646+
647+
test("DeltaLog cache size should honor config limit") {
648+
def assertCacheSize(expected: Long): Unit = {
649+
for (_ <- 1 to 6) {
650+
withTempDir(dir => {
651+
val path = dir.getCanonicalPath
652+
spark.range(10).write.format("delta").mode("append").save(path)
653+
})
654+
}
655+
assert(DeltaLog.cacheSize === expected)
656+
}
657+
DeltaLog.unsetCache()
658+
withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_SIZE.key -> "4") {
659+
assertCacheSize(4)
660+
DeltaLog.unsetCache()
661+
// the larger of SQLConf and env var is adopted
662+
try {
663+
System.getProperties.setProperty("delta.log.cacheSize", "5")
664+
assertCacheSize(5)
665+
} finally {
666+
System.getProperties.remove("delta.log.cacheSize")
667+
}
668+
}
669+
670+
// assert timeconf returns correct value
671+
withSQLConf(DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES.key -> "100") {
672+
assert(spark.sessionState.conf.getConf(
673+
DeltaSQLConf.DELTA_LOG_CACHE_RETENTION_MINUTES) === 100)
674+
}
675+
}
646676
}

0 commit comments

Comments
 (0)