diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index a9a79de05c27..4b9a352272ac 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -55,6 +55,7 @@ object LogKey extends Enumeration { val CSV_SOURCE = Value val DATA = Value val DATABASE_NAME = Value + val DATAFRAME_CACHE_ENTRY = Value val DRIVER_ID = Value val DROPPED_PARTITIONS = Value val END_POINT = Value @@ -116,6 +117,7 @@ object LogKey extends Enumeration { val QUERY_HINT = Value val QUERY_ID = Value val QUERY_PLAN = Value + val QUERY_PLAN_COMPARISON = Value val QUERY_PLAN_LENGTH_ACTUAL = Value val QUERY_PLAN_LENGTH_MAX = Value val RANGE = Value diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 4768dbcdbc23..d2fa696ea6d1 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -153,10 +153,42 @@ trait Logging { if (log.isDebugEnabled) log.debug(msg) } + protected def logDebug(entry: LogEntry): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message) + } + } + } + + protected def logDebug(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isDebugEnabled) { + withLogContext(entry.context) { + log.debug(entry.message, throwable) + } + } + } + protected def logTrace(msg: => String): Unit = { if (log.isTraceEnabled) log.trace(msg) } + protected def logTrace(entry: LogEntry): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message) + } + } + } + + protected def logTrace(entry: LogEntry, throwable: Throwable): Unit = { + if (log.isTraceEnabled) { + withLogContext(entry.context) { + log.trace(entry.message, throwable) + } + } + } + protected def logWarning(msg: => String): Unit = { if (log.isWarnEnabled) log.warn(msg) } diff --git a/common/utils/src/test/resources/log4j2.properties b/common/utils/src/test/resources/log4j2.properties index 2c7563ec8d3d..e3bd8689993d 100644 --- a/common/utils/src/test/resources/log4j2.properties +++ b/common/utils/src/test/resources/log4j2.properties @@ -40,11 +40,11 @@ appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex # Custom loggers logger.structured.name = org.apache.spark.util.StructuredLoggingSuite -logger.structured.level = info +logger.structured.level = trace logger.structured.appenderRefs = structured logger.structured.appenderRef.structured.ref = structured logger.pattern.name = org.apache.spark.util.PatternLoggingSuite -logger.pattern.level = info +logger.pattern.level = trace logger.pattern.appenderRefs = pattern logger.pattern.appenderRef.pattern.ref = pattern diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index 6bdd932561b5..cb2892498fe3 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -80,7 +80,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg)), (Level.WARN, () => logWarning(basicMsg)), - (Level.INFO, () => logInfo(basicMsg))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg)), + (Level.DEBUG, () => logDebug(basicMsg)), + (Level.TRACE, () => logTrace(basicMsg))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsg(level).r.matches(logOutput)) } @@ -91,7 +93,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(basicMsg, exception)), (Level.WARN, () => logWarning(basicMsg, exception)), - (Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, logFunc) => + (Level.INFO, () => logInfo(basicMsg, exception)), + (Level.DEBUG, () => logDebug(basicMsg, exception)), + (Level.TRACE, () => logTrace(basicMsg, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput)) } @@ -101,7 +105,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDC)), (Level.WARN, () => logWarning(msgWithMDC)), - (Level.INFO, () => logInfo(msgWithMDC))).foreach { + (Level.INFO, () => logInfo(msgWithMDC)), + (Level.DEBUG, () => logDebug(msgWithMDC)), + (Level.TRACE, () => logTrace(msgWithMDC))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDC(level).r.matches(logOutput)) @@ -113,7 +119,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithMDCAndException, exception)), (Level.WARN, () => logWarning(msgWithMDCAndException, exception)), - (Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach { + (Level.INFO, () => logInfo(msgWithMDCAndException, exception)), + (Level.DEBUG, () => logDebug(msgWithMDCAndException, exception)), + (Level.TRACE, () => logTrace(msgWithMDCAndException, exception))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput)) @@ -124,7 +132,9 @@ trait LoggingSuiteBase Seq( (Level.ERROR, () => logError(msgWithConcat)), (Level.WARN, () => logWarning(msgWithConcat)), - (Level.INFO, () => logInfo(msgWithConcat))).foreach { + (Level.INFO, () => logInfo(msgWithConcat)), + (Level.DEBUG, () => logDebug(msgWithConcat)), + (Level.TRACE, () => logTrace(msgWithConcat))).foreach { case (level, logFunc) => val logOutput = captureLogOutput(logFunc) verifyMsgWithConcat(level, logOutput) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 73cb4fba8637..c94126377523 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1609,6 +1609,20 @@ object SQLConf { .checkValues(StorageLevelMapper.values.map(_.name()).toSet) .createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name()) + val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel") + .internal() + .doc("Configures the log level of Dataframe cache operations, including adding and removing " + + "entries from Dataframe cache, hit and miss on cache application. The default log " + + "level is 'trace'. This log should only be used for debugging purposes and not in the " + + "production environment, since it generates a large amount of logs.") + .version("4.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel), + "Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " + + "'trace', 'debug', 'info', 'warn' and 'error'.") + .createWithDefault("trace") + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .internal() .doc("When false, we will throw an error if a query contains a cartesian product without " + @@ -5371,6 +5385,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def defaultCacheStorageLevel: StorageLevel = StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL)) + def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL) + def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED) override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 6c5639ef99d4..4f3cecd17894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -21,13 +21,15 @@ import scala.collection.immutable.IndexedSeq import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{LogEntry, Logging, MDC} +import org.apache.spark.internal.LogKey._ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression} import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View} import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION +import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils @@ -38,7 +40,14 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK /** Holds a cached logical plan and its data */ -case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) +case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) { + override def toString: String = + s""" + |CachedData( + |logicalPlan=$plan + |InMemoryRelation=$cachedRepresentation) + |""".stripMargin +} /** * Provides support in a SQLContext for caching query results and automatically using these cached @@ -62,6 +71,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { def clearCache(): Unit = this.synchronized { cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache()) cachedData = IndexedSeq[CachedData]() + CacheManager.logCacheOperation(log"Cleared all Dataframe cache entries") } /** Checks if the cache is empty. */ @@ -119,7 +129,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { if (lookupCachedData(planToCache).nonEmpty) { logWarning("Data has already been cached.") } else { - cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData + val cd = CachedData(planToCache, inMemoryRelation) + cachedData = cd +: cachedData + CacheManager.logCacheOperation(log"Added Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}") } } } @@ -204,6 +217,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd)) } plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) } + CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " + + log"cache entries, with logical plans being " + + log"\n[${MDC(QUERY_PLAN, plansToUncache.map(_.plan).mkString(",\n"))}]") // Re-compile dependent cached queries after removing the cached query. if (!cascade) { @@ -268,6 +284,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { logWarning("While recaching, data was already added to cache.") } else { cachedData = recomputedPlan +: cachedData + CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") } } } @@ -280,7 +298,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { /** Optionally returns cached data for the given [[LogicalPlan]]. */ def lookupCachedData(plan: LogicalPlan): Option[CachedData] = { - cachedData.find(cd => plan.sameResult(cd.plan)) + val result = cachedData.find(cd => plan.sameResult(cd.plan)) + if (result.isDefined) { + CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" + + log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}") + } + result } /** Replaces segments of the given logical plan with cached versions where possible. */ @@ -301,9 +325,21 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { }.getOrElse(currentFragment) } - newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + val result = newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan)) } + + if (result.fastEquals(plan)) { + CacheManager.logCacheOperation( + log"Dataframe cache miss for input plan:\n${MDC(QUERY_PLAN, plan)}") + CacheManager.logCacheOperation(log"Last 20 Dataframe cache entry logical plans:\n" + + log"[${MDC(DATAFRAME_CACHE_ENTRY, cachedData.take(20).map(_.plan).mkString(",\n"))}]") + } else { + CacheManager.logCacheOperation(log"Dataframe cache hit plan change summary:\n" + + log"${MDC( + QUERY_PLAN_COMPARISON, sideBySide(plan.treeString, result.treeString).mkString("\n"))}") + } + result } /** @@ -396,3 +432,16 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs) } } + +object CacheManager extends Logging { + def logCacheOperation(f: => LogEntry): Unit = { + SQLConf.get.dataframeCacheLogLevel match { + case "TRACE" => logTrace(f) + case "DEBUG" => logDebug(f) + case "INFO" => logInfo(f) + case "WARN" => logWarning(f) + case "ERROR" => logError(f) + case _ => logTrace(f) + } + } +}