From aa524d13284d619729bce2fccd80bdff406b7013 Mon Sep 17 00:00:00 2001 From: Xinyi Yu Date: Wed, 10 Apr 2024 16:21:45 -0700 Subject: [PATCH 1/4] add debug logs --- .../apache/spark/sql/internal/SQLConf.scala | 15 ++++++ .../spark/sql/execution/CacheManager.scala | 50 +++++++++++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 73cb4fba8637..7df5fbecc984 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1609,6 +1609,19 @@ object SQLConf { .checkValues(StorageLevelMapper.values.map(_.name()).toSet) .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name()) + val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") + .doc("Configures the log level of Dataframe cache operations, including adding and removing " + + "entries from Dataframe cache, hit and miss on cache application. The default log " + + "level is 'trace'. This log should only be used for debugging purposes and not in the " + + "production environment, since it generates a large amount of logs.") + .version("4.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + @@ -5371,6 +5384,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def defaultCacheStorageLevel: StorageLevel = StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL)) + def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL) + def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 6c5639ef99d4..1deaf8cd4955 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils @@ -38,7 +39,14 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK /** Holds a cached logical plan and its data */ -case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) +case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) { + override def toString: String = + s""" + |CachedData( + |logicalPlan=$plan + |InMemoryRelation=$cachedRepresentation) + |""".stripMargin +} /** * Provides support in a SQLContext for caching query results and automatically using these cached @@ -62,6 +70,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { def clearCache(): Unit = this.synchronized { cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) cachedData = IndexedSeq[CachedData]() + CacheManager.logCacheOperation(s"Cleared all Dataframe cache entries") } /** Checks if the cache is empty. */ @@ -119,7 +128,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Data has already been cached.") } else { - cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData + val cd = CachedData(planToCache, inMemoryRelation) + cachedData = cd +: cachedData + CacheManager.logCacheOperation(s"Added Dataframe cache entry:$cd") } } } @@ -204,6 +215,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + CacheManager.logCacheOperation(s"Removed ${plansToUncache.size} Dataframe cache " + + s"entries, with logical plans being \n[${plansToUncache.map(_.plan).mkString(",\n")}]") // Re-compile dependent cached queries after removing the cached query. if (!cascade) { @@ -268,6 +281,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { logWarning("While recaching, data was already added to cache.") } else { cachedData = recomputedPlan +: cachedData + CacheManager.logCacheOperation(s"Re-cached Dataframe cache entry:$recomputedPlan") } } } @@ -280,7 +294,12 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { /** Optionally returns cached data for the given [[LogicalPlan]]. */ def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { - cachedData.find(cd => plan.sameResult(cd.plan)) + val result = cachedData.find(cd => plan.sameResult(cd.plan)) + if (result.isDefined) { + CacheManager.logCacheOperation(s"Dataframe cache hit for input plan:\n$plan" + + s"matched with cache entry:${result.get}") + } + result } /** Replaces segments of the given logical plan with cached versions where possible. */ @@ -301,9 +320,19 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { }.getOrElse(currentFragment) } - newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + val result = newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) } + + if (result.fastEquals(plan)) { + CacheManager.logCacheOperation(s"Dataframe cache miss for input plan:\n$plan") + CacheManager.logCacheOperation(s"Last 20 Dataframe cache entry logical plans:\n" + + s"[${cachedData.take(20).map(_.plan).mkString(",\n")}]") + } else { + CacheManager.logCacheOperation("Dataframe cache hit plan change summary:\n" + + s"${sideBySide(plan.treeString, result.treeString).mkString("\n")}") + } + result } /** @@ -396,3 +425,16 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs) } } + +object CacheManager extends Logging { + def logCacheOperation(f: => String): Unit = { + SQLConf.get.dataframeCacheLogLevel match { + case "TRACE" => logTrace(f) + case "DEBUG" => logDebug(f) + case "INFO" => logInfo(f) + case "WARN" => logWarning(f) + case "ERROR" => logError(f) + case _ => logTrace(f) + } + } +} \ No newline at end of file From 5fad0b9e9305c751e37a75a15fd1bf8410c71655 Mon Sep 17 00:00:00 2001 From: Xinyi Yu Date: Wed, 10 Apr 2024 16:49:21 -0700 Subject: [PATCH 2/4] add new line --- .../scala/org/apache/spark/sql/execution/CacheManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 1deaf8cd4955..d78640e9c8a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -437,4 +437,4 @@ object CacheManager extends Logging { case _ => logTrace(f) } } -} \ No newline at end of file +} From dd76c2b1aa4478bd091e5d9d2e75000aba55d32d Mon Sep 17 00:00:00 2001 From: Xinyi Yu Date: Mon, 15 Apr 2024 15:05:40 -0700 Subject: [PATCH 3/4] use structured logging --- .../org/apache/spark/internal/LogKey.scala | 2 ++ .../org/apache/spark/internal/Logging.scala | 32 +++++++++++++++++ .../src/test/resources/log4j2.properties | 4 +-- .../spark/util/StructuredLoggingSuite.scala | 20 ++++++++--- .../spark/sql/execution/CacheManager.scala | 35 +++++++++++-------- 5 files changed, 72 insertions(+), 21 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index a9a79de05c27..4b9a352272ac 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -55,6 +55,7 @@ object LogKey extends Enumeration { val CSV_SOURCE = Value val DATA = Value val DATABASE_NAME = Value + val DATAFRAME_CACHE_ENTRY = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value val END_POINT = Value @@ -116,6 +117,7 @@ object LogKey extends Enumeration { val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value + val QUERY_PLAN_COMPARISON = Value val QUERY_PLAN_LENGTH_ACTUAL = Value val QUERY_PLAN_LENGTH_MAX = Value val RANGE = Value diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 4768dbcdbc23..d2fa696ea6d1 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -153,10 +153,42 @@ trait Logging { if (log.isDebugEnabled) log.debug(msg) } + protected def logDebug(entry: LogEntry): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message) + } + } + } + + protected def logDebug(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message, throwable) + } + } + } + protected def logTrace(msg: => String): Unit = { if (log.isTraceEnabled) log.trace(msg) } + protected def logTrace(entry: LogEntry): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message) + } + } + } + + protected def logTrace(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message, throwable) + } + } + } + protected def logWarning(msg: => String): Unit = { if (log.isWarnEnabled) log.warn(msg) } diff --git a/common/utils/src/test/resources/log4j2.properties b/common/utils/src/test/resources/log4j2.properties index 2c7563ec8d3d..e3bd8689993d 100644 --- a/common/utils/src/test/resources/log4j2.properties +++ b/common/utils/src/test/resources/log4j2.properties @@ -40,11 +40,11 @@ appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex # Custom loggers logger.structured.name = org.apache.spark.util.StructuredLoggingSuite -logger.structured.level = info +logger.structured.level = trace logger.structured.appenderRefs = structured logger.structured.appenderRef.structured.ref = structured logger.pattern.name = org.apache.spark.util.PatternLoggingSuite -logger.pattern.level = info +logger.pattern.level = trace logger.pattern.appenderRefs = pattern logger.pattern.appenderRef.pattern.ref = pattern diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index 6bdd932561b5..cb2892498fe3 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -80,7 +80,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg)), (Level.WARN, () => logWarning(basicMsg)), - (Level.INFO, () => logInfo(basicMsg))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg)), + (Level.DEBUG, () => logDebug(basicMsg)), + (Level.TRACE, () => logTrace(basicMsg))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsg(level).r.matches(logOutput)) } @@ -91,7 +93,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg, exception)), (Level.WARN, () => logWarning(basicMsg, exception)), - (Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg, exception)), + (Level.DEBUG, () => logDebug(basicMsg, exception)), + (Level.TRACE, () => logTrace(basicMsg, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput)) } @@ -101,7 +105,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDC)), (Level.WARN, () => logWarning(msgWithMDC)), - (Level.INFO, () => logInfo(msgWithMDC))).foreach { + (Level.INFO, () => logInfo(msgWithMDC)), + (Level.DEBUG, () => logDebug(msgWithMDC)), + (Level.TRACE, () => logTrace(msgWithMDC))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDC(level).r.matches(logOutput)) @@ -113,7 +119,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDCAndException, exception)), (Level.WARN, () => logWarning(msgWithMDCAndException, exception)), - (Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach { + (Level.INFO, () => logInfo(msgWithMDCAndException, exception)), + (Level.DEBUG, () => logDebug(msgWithMDCAndException, exception)), + (Level.TRACE, () => logTrace(msgWithMDCAndException, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput)) @@ -124,7 +132,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithConcat)), (Level.WARN, () => logWarning(msgWithConcat)), - (Level.INFO, () => logInfo(msgWithConcat))).foreach { + (Level.INFO, () => logInfo(msgWithConcat)), + (Level.DEBUG, () => logDebug(msgWithConcat)), + (Level.TRACE, () => logTrace(msgWithConcat))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) verifyMsgWithConcat(level, logOutput) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index d78640e9c8a3..4f3cecd17894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -21,7 +21,8 @@ import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} @@ -70,7 +71,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { def clearCache(): Unit = this.synchronized { cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) cachedData = IndexedSeq[CachedData]() - CacheManager.logCacheOperation(s"Cleared all Dataframe cache entries") + CacheManager.logCacheOperation(log"Cleared all Dataframe cache entries") } /** Checks if the cache is empty. */ @@ -130,7 +131,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } else { val cd = CachedData(planToCache, inMemoryRelation) cachedData = cd +: cachedData - CacheManager.logCacheOperation(s"Added Dataframe cache entry:$cd") + CacheManager.logCacheOperation(log"Added Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}") } } } @@ -215,8 +217,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } - CacheManager.logCacheOperation(s"Removed ${plansToUncache.size} Dataframe cache " + - s"entries, with logical plans being \n[${plansToUncache.map(_.plan).mkString(",\n")}]") + CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " + + log"cache entries, with logical plans being " + + log"\n[${MDC(QUERY_PLAN, plansToUncache.map(_.plan).mkString(",\n"))}]") // Re-compile dependent cached queries after removing the cached query. if (!cascade) { @@ -281,7 +284,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { logWarning("While recaching, data was already added to cache.") } else { cachedData = recomputedPlan +: cachedData - CacheManager.logCacheOperation(s"Re-cached Dataframe cache entry:$recomputedPlan") + CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") } } } @@ -296,8 +300,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { val result = cachedData.find(cd => plan.sameResult(cd.plan)) if (result.isDefined) { - CacheManager.logCacheOperation(s"Dataframe cache hit for input plan:\n$plan" + - s"matched with cache entry:${result.get}") + CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" + + log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}") } result } @@ -325,12 +330,14 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } if (result.fastEquals(plan)) { - CacheManager.logCacheOperation(s"Dataframe cache miss for input plan:\n$plan") - CacheManager.logCacheOperation(s"Last 20 Dataframe cache entry logical plans:\n" + - s"[${cachedData.take(20).map(_.plan).mkString(",\n")}]") + CacheManager.logCacheOperation( + log"Dataframe cache miss for input plan:\n${MDC(QUERY_PLAN, plan)}") + CacheManager.logCacheOperation(log"Last 20 Dataframe cache entry logical plans:\n" + + log"[${MDC(DATAFRAME_CACHE_ENTRY, cachedData.take(20).map(_.plan).mkString(",\n"))}]") } else { - CacheManager.logCacheOperation("Dataframe cache hit plan change summary:\n" + - s"${sideBySide(plan.treeString, result.treeString).mkString("\n")}") + CacheManager.logCacheOperation(log"Dataframe cache hit plan change summary:\n" + + log"${MDC( + QUERY_PLAN_COMPARISON, sideBySide(plan.treeString, result.treeString).mkString("\n"))}") } result } @@ -427,7 +434,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } object CacheManager extends Logging { - def logCacheOperation(f: => String): Unit = { + def logCacheOperation(f: => LogEntry): Unit = { SQLConf.get.dataframeCacheLogLevel match { case "TRACE" => logTrace(f) case "DEBUG" => logDebug(f) From 33a8cac38dfe88dad571517cf5b56e94f78f3273 Mon Sep 17 00:00:00 2001 From: Xinyi Yu Date: Mon, 15 Apr 2024 15:07:31 -0700 Subject: [PATCH 4/4] internal conf --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7df5fbecc984..c94126377523 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,7 @@ object SQLConf { .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name()) val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") + .internal() .doc("Configures the log level of Dataframe cache operations, including adding and removing " + "entries from Dataframe cache, hit and miss on cache application. The default log " + "level is 'trace'. This log should only be used for debugging purposes and not in the " +