Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ object LogKey extends Enumeration {
val CSV_SOURCE = Value
val DATA = Value
val DATABASE_NAME = Value
val DATAFRAME_CACHE_ENTRY = Value
val DRIVER_ID = Value
val DROPPED_PARTITIONS = Value
val END_POINT = Value
Expand Down Expand Up @@ -116,6 +117,7 @@ object LogKey extends Enumeration {
val QUERY_HINT = Value
val QUERY_ID = Value
val QUERY_PLAN = Value
val QUERY_PLAN_COMPARISON = Value
val QUERY_PLAN_LENGTH_ACTUAL = Value
val QUERY_PLAN_LENGTH_MAX = Value
val RANGE = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,42 @@ trait Logging {
if (log.isDebugEnabled) log.debug(msg)
}

protected def logDebug(entry: LogEntry): Unit = {
if (log.isDebugEnabled) {
withLogContext(entry.context) {
log.debug(entry.message)
}
}
}

protected def logDebug(entry: LogEntry, throwable: Throwable): Unit = {
if (log.isDebugEnabled) {
withLogContext(entry.context) {
log.debug(entry.message, throwable)
}
}
}

protected def logTrace(msg: => String): Unit = {
if (log.isTraceEnabled) log.trace(msg)
}

protected def logTrace(entry: LogEntry): Unit = {
if (log.isTraceEnabled) {
withLogContext(entry.context) {
log.trace(entry.message)
}
}
}

protected def logTrace(entry: LogEntry, throwable: Throwable): Unit = {
if (log.isTraceEnabled) {
withLogContext(entry.context) {
log.trace(entry.message, throwable)
}
}
}

protected def logWarning(msg: => String): Unit = {
if (log.isWarnEnabled) log.warn(msg)
}
Expand Down
4 changes: 2 additions & 2 deletions common/utils/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Custom loggers
logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
logger.structured.level = info
logger.structured.level = trace
logger.structured.appenderRefs = structured
logger.structured.appenderRef.structured.ref = structured

logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
logger.pattern.level = info
logger.pattern.level = trace
logger.pattern.appenderRefs = pattern
logger.pattern.appenderRef.pattern.ref = pattern
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ trait LoggingSuiteBase
Seq(
(Level.ERROR, () => logError(basicMsg)),
(Level.WARN, () => logWarning(basicMsg)),
(Level.INFO, () => logInfo(basicMsg))).foreach { case (level, logFunc) =>
(Level.INFO, () => logInfo(basicMsg)),
(Level.DEBUG, () => logDebug(basicMsg)),
(Level.TRACE, () => logTrace(basicMsg))).foreach { case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForBasicMsg(level).r.matches(logOutput))
}
Expand All @@ -91,7 +93,9 @@ trait LoggingSuiteBase
Seq(
(Level.ERROR, () => logError(basicMsg, exception)),
(Level.WARN, () => logWarning(basicMsg, exception)),
(Level.INFO, () => logInfo(basicMsg, exception))).foreach { case (level, logFunc) =>
(Level.INFO, () => logInfo(basicMsg, exception)),
(Level.DEBUG, () => logDebug(basicMsg, exception)),
(Level.TRACE, () => logTrace(basicMsg, exception))).foreach { case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForBasicMsgWithException(level).r.matches(logOutput))
}
Expand All @@ -101,7 +105,9 @@ trait LoggingSuiteBase
Seq(
(Level.ERROR, () => logError(msgWithMDC)),
(Level.WARN, () => logWarning(msgWithMDC)),
(Level.INFO, () => logInfo(msgWithMDC))).foreach {
(Level.INFO, () => logInfo(msgWithMDC)),
(Level.DEBUG, () => logDebug(msgWithMDC)),
(Level.TRACE, () => logTrace(msgWithMDC))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForMsgWithMDC(level).r.matches(logOutput))
Expand All @@ -113,7 +119,9 @@ trait LoggingSuiteBase
Seq(
(Level.ERROR, () => logError(msgWithMDCAndException, exception)),
(Level.WARN, () => logWarning(msgWithMDCAndException, exception)),
(Level.INFO, () => logInfo(msgWithMDCAndException, exception))).foreach {
(Level.INFO, () => logInfo(msgWithMDCAndException, exception)),
(Level.DEBUG, () => logDebug(msgWithMDCAndException, exception)),
(Level.TRACE, () => logTrace(msgWithMDCAndException, exception))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForMsgWithMDCAndException(level).r.matches(logOutput))
Expand All @@ -124,7 +132,9 @@ trait LoggingSuiteBase
Seq(
(Level.ERROR, () => logError(msgWithConcat)),
(Level.WARN, () => logWarning(msgWithConcat)),
(Level.INFO, () => logInfo(msgWithConcat))).foreach {
(Level.INFO, () => logInfo(msgWithConcat)),
(Level.DEBUG, () => logDebug(msgWithConcat)),
(Level.TRACE, () => logTrace(msgWithConcat))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
verifyMsgWithConcat(level, logOutput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,20 @@ object SQLConf {
.checkValues(StorageLevelMapper.values.map(_.name()).toSet)
.createWithDefault(StorageLevelMapper.MEMORY_AND_DISK.name())

val DATAFRAME_CACHE_LOG_LEVEL = buildConf("spark.sql.dataframeCache.logLevel")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we need to debug cache table as well? Shall we rename the config as
spark.sql.cache.logLevel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's make it an internal conf since it is for developers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the Dataframe cache naming to differentiate it from the RDD cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RDD is a Spark core concept. Anyway I respect your choice here.

.internal()
.doc("Configures the log level of Dataframe cache operations, including adding and removing " +
"entries from Dataframe cache, hit and miss on cache application. The default log " +
"level is 'trace'. This log should only be used for debugging purposes and not in the " +
"production environment, since it generates a large amount of logs.")
.version("4.0.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
"Invalid value for 'spark.sql.dataframeCache.logLevel'. Valid values are " +
"'trace', 'debug', 'info', 'warn' and 'error'.")
.createWithDefault("trace")

val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.internal()
.doc("When false, we will throw an error if a query contains a cartesian product without " +
Expand Down Expand Up @@ -5371,6 +5385,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def defaultCacheStorageLevel: StorageLevel =
StorageLevel.fromString(getConf(DEFAULT_CACHE_STORAGE_LEVEL))

def dataframeCacheLogLevel: String = getConf(DATAFRAME_CACHE_LOG_LEVEL)

def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)

override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import scala.collection.immutable.IndexedSeq

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{LogEntry, Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
Expand All @@ -38,7 +40,14 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) {
override def toString: String =
s"""
|CachedData(
|logicalPlan=$plan
|InMemoryRelation=$cachedRepresentation)
|""".stripMargin
}

/**
* Provides support in a SQLContext for caching query results and automatically using these cached
Expand All @@ -62,6 +71,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
def clearCache(): Unit = this.synchronized {
cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
cachedData = IndexedSeq[CachedData]()
CacheManager.logCacheOperation(log"Cleared all Dataframe cache entries")
}

/** Checks if the cache is empty. */
Expand Down Expand Up @@ -119,7 +129,10 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Data has already been cached.")
} else {
cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData
val cd = CachedData(planToCache, inMemoryRelation)
cachedData = cd +: cachedData
CacheManager.logCacheOperation(log"Added Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}")
}
}
}
Expand Down Expand Up @@ -204,6 +217,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
}
plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) }
CacheManager.logCacheOperation(log"Removed ${MDC(SIZE, plansToUncache.size)} Dataframe " +
log"cache entries, with logical plans being " +
log"\n[${MDC(QUERY_PLAN, plansToUncache.map(_.plan).mkString(",\n"))}]")

// Re-compile dependent cached queries after removing the cached query.
if (!cascade) {
Expand Down Expand Up @@ -268,6 +284,8 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
logWarning("While recaching, data was already added to cache.")
} else {
cachedData = recomputedPlan +: cachedData
CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}")
}
}
}
Expand All @@ -280,7 +298,13 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {

/** Optionally returns cached data for the given [[LogicalPlan]]. */
def lookupCachedData(plan: LogicalPlan): Option[CachedData] = {
cachedData.find(cd => plan.sameResult(cd.plan))
val result = cachedData.find(cd => plan.sameResult(cd.plan))
if (result.isDefined) {
CacheManager.logCacheOperation(log"Dataframe cache hit for input plan:" +
log"\n${MDC(QUERY_PLAN, plan)} matched with cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, result.get)}")
}
result
}

/** Replaces segments of the given logical plan with cached versions where possible. */
Expand All @@ -301,9 +325,21 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
}.getOrElse(currentFragment)
}

newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
val result = newPlan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case s: SubqueryExpression => s.withNewPlan(useCachedData(s.plan))
}

if (result.fastEquals(plan)) {
CacheManager.logCacheOperation(
log"Dataframe cache miss for input plan:\n${MDC(QUERY_PLAN, plan)}")
CacheManager.logCacheOperation(log"Last 20 Dataframe cache entry logical plans:\n" +
log"[${MDC(DATAFRAME_CACHE_ENTRY, cachedData.take(20).map(_.plan).mkString(",\n"))}]")
} else {
CacheManager.logCacheOperation(log"Dataframe cache hit plan change summary:\n" +
log"${MDC(
QUERY_PLAN_COMPARISON, sideBySide(plan.treeString, result.treeString).mkString("\n"))}")
}
result
}

/**
Expand Down Expand Up @@ -396,3 +432,16 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
SparkSession.getOrCloneSessionWithConfigsOff(session, disableConfigs)
}
}

object CacheManager extends Logging {
def logCacheOperation(f: => LogEntry): Unit = {
SQLConf.get.dataframeCacheLogLevel match {
case "TRACE" => logTrace(f)
case "DEBUG" => logDebug(f)
case "INFO" => logInfo(f)
case "WARN" => logWarning(f)
case "ERROR" => logError(f)
case _ => logTrace(f)
}
}
}