Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,12 @@ private[spark] object Status {
.version("3.0.0")
.booleanConf
.createWithDefault(false)

val DISK_STORE_DIR_FOR_STATUS =
ConfigBuilder("spark.appStatusStore.diskStoreDir")
.doc("Local directory where to store app status. " +
"It's an alternative to the in-memory kv store")
.version("3.4.0")
.stringConf
.createOptional
}
27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

package org.apache.spark.status

import java.io.File
import java.nio.file.Files
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.util.control.NonFatal

import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status.DISK_STORE_DIR_FOR_STATUS
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
import org.apache.spark.ui.scope._
Expand All @@ -34,6 +39,7 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
*/
private[spark] class AppStatusStore(
val store: KVStore,
val diskStore: Option[KVStore] = None,
val listener: Option[AppStatusListener] = None) {

def applicationInfo(): v1.ApplicationInfo = {
Expand Down Expand Up @@ -755,18 +761,33 @@ private[spark] class AppStatusStore(
}
}

private[spark] object AppStatusStore {
private[spark] object AppStatusStore extends Logging {

val CURRENT_VERSION = 2L

/**
* Create an in-memory store for a live application.
* Create an in-memory store for a live application. also create a disk store if
* the `spark.appStatusStore.diskStore.dir` is set
*/
def createLiveStore(
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
new AppStatusStore(store, listener = Some(listener))
// create a disk-based kv store if the directory is set
val diskStore = conf.get(DISK_STORE_DIR_FOR_STATUS).flatMap { storeDir =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, currently, all disk stores are broken in Apple Silicon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for letting me know. If the failure happens during initialization, I think we are safe here.

val storePath = Files.createDirectories(
new File(storeDir, System.currentTimeMillis().toString).toPath
).toFile
try {
Some(KVUtils.open(storePath, AppStatusStoreMetadata(CURRENT_VERSION), conf))
.map(new ElementTrackingStore(_, conf))
} catch {
case NonFatal(e) =>
logWarning("Failed to create disk-based app status store: ", e)
None
}
}
new AppStatusStore(store, diskStore = diskStore, listener = Some(listener))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have enabled diskstore, thoughts on using it for everything at driver ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for now, we couldn't. The SQL UI needs to maintain the task metrics in memory in order to render UI quickly. But in the future, I think we can build a 2-layer store.

}
}
9 changes: 9 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,15 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
<code>?planDescription=[true (default) | false]</code> enables/disables Physical <code>planDescription</code> on demand for the given query when Physical Plan size is high.
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/diagnostics/sql/[execution-id]</code></td>
<td>Diagnostic for the given query. it includes:
<br>
1. plan change history of adaptive execution
<br>
2. physical plan description with unlimited fields
</td>
</tr>
<tr>
<td><code>/applications/[app-id]/environment</code></td>
<td>Environment details of the given application.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3198,6 +3198,15 @@ object SQLConf {
.intConf
.createWithDefault(25)

val MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC =
buildConf("spark.sql.debug.maxToStringFieldsForDiagnostic")
.doc(s"Similar to ${MAX_TO_STRING_FIELDS.key}, but it will take effect when the " +
s"output will be stored for the diagnostics API. The output will be stored in " +
s"disk instead of memory. So it can be larger than ${MAX_TO_STRING_FIELDS.key}")
.version("3.4.0")
.intConf
.createWithDefault(10000)

val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength")
.doc("Maximum number of characters to output for a plan string. If the plan is " +
"longer, further output will be truncated. The default setting always generates a full " +
Expand Down Expand Up @@ -4439,6 +4448,8 @@ class SQLConf extends Serializable with Logging {

def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)

def maxToStringFieldsForDiagnostic: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS_FOR_DIAGNOSTIC)

def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt

def maxMetadataStringLength: Int = getConf(SQLConf.MAX_METADATA_STRING_LENGTH)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.diagnostic

import org.apache.spark.SparkConf
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.ExplainMode
import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
import org.apache.spark.status.{ElementTrackingStore, KVUtils}

/**
* A Spark listener that writes diagnostic information to a data store. The information can be
* accessed by the public REST API.
*
* @param kvStore used to store the diagnostic information
*/
class DiagnosticListener(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some classdoc?

conf: SparkConf,
kvStore: ElementTrackingStore) extends SparkListener {

kvStore.addTrigger(
classOf[ExecutionDiagnosticData],
conf.get(UI_RETAINED_EXECUTIONS)) { count =>
cleanupExecutions(count)
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
case e: SparkListenerSQLAdaptiveExecutionUpdate => onAdaptiveExecutionUpdate(e)
case _ => // Ignore
}

private def onAdaptiveExecutionUpdate(event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = {
val data = new AdaptiveExecutionUpdate(
event.executionId,
System.currentTimeMillis(),
event.physicalPlanDescription
Copy link
Contributor

@ulysses-you ulysses-you Mar 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

event.physicalPlanDescription use the old maxToStringFields to do explain string, do you want to the same thing that re-explain it ?

just a small concern, there is a cost to do the explain if the plan is large but I do not have a good idea without re-explain it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not worried about the cost too much because a separate listener with a separate event queue won't slow down other listeners.
But I do want to make sure everything we added is necessary. I mean, usually, it's enough to only output full fields in the final plan, and plan change history can keep a truncated plan. Because, IIUC, the AQE aims to change the operators and expressions usually unchanged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and if we need more fields in the plan change history, it's always easy to add more than removing something.

)
kvStore.write(data)
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
val sqlConf = event.qe.sparkSession.sessionState.conf
val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
val physicalPlan = event.qe.explainString(
planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
val data = new ExecutionDiagnosticData(
event.executionId,
physicalPlan,
event.time,
None,
None
)
// Check triggers since it's adding new netries
kvStore.write(data, checkTriggers = true)
}

private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
try {
val existing = kvStore.read(classOf[ExecutionDiagnosticData], event.executionId)
val sqlConf = event.qe.sparkSession.sessionState.conf
val planDescriptionMode = ExplainMode.fromString(sqlConf.uiExplainMode)
val physicalPlan = event.qe.explainString(
planDescriptionMode, sqlConf.maxToStringFieldsForDiagnostic)
val data = new ExecutionDiagnosticData(
event.executionId,
physicalPlan,
existing.submissionTime,
Some(event.time),
event.executionFailure.map(
e => s"${e.getClass.getCanonicalName}: ${e.getMessage}").orElse(Some(""))
)
kvStore.write(data)
} catch {
case _: NoSuchElementException =>
// this is possibly caused by the query failed before execution.
}
}

private def cleanupExecutions(count: Long): Unit = {
val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
if (countToDelete <= 0) {
return
}
val view = kvStore.view(classOf[ExecutionDiagnosticData]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
toDelete.foreach(e => kvStore.delete(classOf[ExecutionDiagnosticData], e.executionId))
kvStore.removeAllByIndexValues(
classOf[AdaptiveExecutionUpdate], "id", toDelete.map(_.executionId))
}
}

object DiagnosticListener {
val QUEUE_NAME = "diagnostics"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a dedicated queue for this listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because collecting/saving the diagnostics could be slow, I don't want it to impact other critical listeners, e.g. UI listener

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to sqlDiagnostics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm
I put this listener in the SQL folder because I need to capture some SQL events. But in the future, this listener can capture other events to provide diagnostics for other components (e.g. executor). So I think a general name may be better.
But sqlDiagnostics is also fine to me.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.diagnostic

import scala.collection.JavaConverters._

import com.fasterxml.jackson.annotation.JsonIgnore

import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.util.kvstore.{KVIndex, KVStore}

/**
* Provides a view of a KVStore with methods that make it easy to query diagnostic-specific
* information. There's no state kept in this class, so it's ok to have multiple instances
* of it in an application.
*/
class DiagnosticStore(store: KVStore) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some class description, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do


def diagnosticsList(offset: Int, length: Int): Seq[ExecutionDiagnosticData] = {
store.view(classOf[ExecutionDiagnosticData]).skip(offset).max(length).asScala.toSeq
}

def diagnostic(executionId: Long): Option[ExecutionDiagnosticData] = {
try {
Some(store.read(classOf[ExecutionDiagnosticData], executionId))
} catch {
case _: NoSuchElementException => None
}
}

def adaptiveExecutionUpdates(executionId: Long): Seq[AdaptiveExecutionUpdate] = {
store.view(classOf[AdaptiveExecutionUpdate])
.index("updateTime")
.parent(executionId)
.asScala
.toSeq
}
}

/* Represents the diagnostic data of a SQL execution */
class ExecutionDiagnosticData(
@KVIndexParam val executionId: Long,
val physicalPlan: String,
val submissionTime: Long,
val completionTime: Option[Long],
val errorMessage: Option[String])

/* Represents the plan change of an adaptive execution */
class AdaptiveExecutionUpdate(
@KVIndexParam("id")
val executionId: Long,
@KVIndexParam(value = "updateTime", parent = "id")
val updateTime: Long,
val physicalPlan: String) {

@JsonIgnore @KVIndex
private def naturalIndex: Array[Long] = Array(executionId, updateTime)
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,11 @@ class QueryExecution(
append("\n")
}

def explainString(mode: ExplainMode): String = {
def explainString(
mode: ExplainMode,
maxFields: Int = SQLConf.get.maxToStringFields): String = {
val concat = new PlanStringConcat()
explainString(mode, SQLConf.get.maxToStringFields, concat.append)
explainString(mode, maxFields, concat.append)
withRedaction {
concat.toString
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object SQLExecution {
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
val event = SparkListenerSQLExecutionStart(
executionId = executionId,
description = desc,
details = callSite.longForm,
Expand All @@ -105,7 +105,9 @@ object SQLExecution {
// will be caught and reported in the `SparkListenerSQLExecutionEnd`
sparkPlanInfo = SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan),
time = System.currentTimeMillis(),
redactedConfigs))
redactedConfigs)
event.qe = queryExecution
sc.listenerBus.post(event)
body
} catch {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ case class SparkListenerSQLExecutionStart(
sparkPlanInfo: SparkPlanInfo,
time: Long,
modifiedConfigs: Map[String, String] = Map.empty)
extends SparkListenerEvent
extends SparkListenerEvent {

// The `QueryExecution` instance that represents the SQL execution
@JsonIgnore private[sql] var qe: QueryExecution = null
}

@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.diagnostic.DiagnosticListener
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
Expand Down Expand Up @@ -118,6 +119,12 @@ private[sql] class SharedState(
statusStore
}

sparkContext.statusStore.diskStore.foreach { kvStore =>
sparkContext.listenerBus.addToQueue(
new DiagnosticListener(conf, kvStore.asInstanceOf[ElementTrackingStore]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to share the same kvStore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun
I used the same way that the in-memory kvStore used. which is:

  1. define a shared kvStore in SparkContext
  2. share the kvStore with the listeners that need to update the store (Jobs UI Listener, SQL UI Listener)
  3. share the kvStore with the components that need to read the store (web UI, rest API)

DiagnosticListener.QUEUE_NAME)
}

/**
* A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
* data to show.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,15 @@ private[v1] class ApiSqlRootResource extends ApiRequestContext {
def sqlList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SqlResource] = classOf[SqlResource]

@Path("applications/{appId}/diagnostics/sql")
def sqlDiagnosticsList(
@PathParam("appId") appId: String): Class[SQLDiagnosticResource] =
classOf[SQLDiagnosticResource]

@Path("applications/{appId}/{attemptId}/diagnostics/sql")
def sqlDiagnosticsList(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): Class[SQLDiagnosticResource] =
classOf[SQLDiagnosticResource]
}
Loading