Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ private[deploy] class ExecutorRunner(
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
builder.environment.put("SPARK_APPLICATION_ID", s"$appId")
builder.environment.put("SPARK_WORKER_URL", s"$publicAddress:$webUiPort")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.util.logging.RollingFileAppender
private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with Logging {
private val worker = parent.worker
private val workDir = parent.workDir
private val supportedLogTypes = Set("stderr", "stdout")

def renderLog(request: HttpServletRequest): String = {
val defaultBytes = 100 * 1024
Expand Down Expand Up @@ -133,9 +132,6 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with
byteLength: Int
): (String, Long, Long, Long) = {

if (!supportedLogTypes.contains(logType)) {
return ("Error: Log type must be one of " + supportedLogTypes.mkString(", "), 0, 0, 0)
}

// Verify that the normalized path of the log directory is in the working directory
val normalizedUri = new URI(logDirectory).normalize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
import org.apache.log4j.{FileAppender, LogManager, Logger}
import scala.collection.JavaConverters._

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand Down Expand Up @@ -74,9 +76,13 @@ private[spark] class CoarseGrainedExecutorBackend(
}

def extractLogUrls: Map[String, String] = {
val appId = sys.env.get("SPARK_APPLICATION_ID").get
val workerUrl = sys.env.get("SPARK_WORKER_URL").get
val baseUrl = s"http://$workerUrl/logPage/?appId=$appId&executorId=$executorId&logType="
val logFiles = CoarseGrainedExecutorBackend.ALL_LOG4J_FILE_NAMES
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) ++ logFiles.map(name => (name, s"${baseUrl}${name}")).toMap
}

override def receive: PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -139,6 +145,17 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private[spark] object CoarseGrainedExecutorBackend extends Logging {
lazy val ALL_LOG4J_FILE_NAMES = {
val allLoggers: List[Logger] =
LogManager.getCurrentLoggers().asScala.toList.map(_.asInstanceOf[Logger]) :+
LogManager.getRootLogger()

allLoggers
.flatMap(_.getAllAppenders.asScala)
.filter(_.isInstanceOf[FileAppender])
.map(_.asInstanceOf[FileAppender].getFile)
.toSet
}

private def run(
driverUrl: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private[ui] class ExecutorsPage(
if (logsExist) {
<td>
{
info.executorLogs.map { case (logName, logUrl) =>
info.executorLogs.toSeq.sortBy(_._1).map { case (logName, logUrl) =>
<div>
<a href={logUrl}>
{logName}
Expand Down