Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.TriggerThreadDump
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
None
}
}
private[spark] val eventLogCodec: Option[String] = {
val compress = conf.getBoolean("spark.eventLog.compress", false)
if (compress && isEventLogEnabled) {
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
} else {
None
}
}

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None)
val eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")
Expand All @@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
memoryPerSlave: Int = memoryPerSlave,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
eventLogDir: Option[String] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
private[history] val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
Expand Down Expand Up @@ -291,7 +291,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val (logInput, sparkVersion) =
val logInput =
if (isLegacyLogDirectory(eventLog)) {
openLegacyEventLog(logPath)
} else {
Expand All @@ -300,7 +300,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
bus.replay(logInput, sparkVersion, logPath.toString)
bus.replay(logInput, logPath.toString)
new FsApplicationHistoryInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
Expand All @@ -320,30 +320,24 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
* log file (along with other metadata files), which is the case for directories generated by
* the code in previous releases.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc might be better if it said "in Spark 1.2.X and earlier" instead of "in previous releases".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess this handles multiple versions. My bad, fine to leave as is

* @return 2-tuple of (input stream of the events, version of Spark which wrote the log)
* @return input stream that holds one JSON record per line.
*/
private[history] def openLegacyEventLog(dir: Path): (InputStream, String) = {
private[history] def openLegacyEventLog(dir: Path): InputStream = {
val children = fs.listStatus(dir)
var eventLogPath: Path = null
var codecName: Option[String] = None
var sparkVersion: String = null

children.foreach { child =>
child.getPath().getName() match {
case name if name.startsWith(LOG_PREFIX) =>
eventLogPath = child.getPath()

case codec if codec.startsWith(COMPRESSION_CODEC_PREFIX) =>
codecName = Some(codec.substring(COMPRESSION_CODEC_PREFIX.length()))

case version if version.startsWith(SPARK_VERSION_PREFIX) =>
sparkVersion = version.substring(SPARK_VERSION_PREFIX.length())

case _ =>
}
}

if (eventLogPath == null || sparkVersion == null) {
if (eventLogPath == null) {
throw new IllegalArgumentException(s"$dir is not a Spark application log directory.")
}

Expand All @@ -355,7 +349,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
}

val in = new BufferedInputStream(fs.open(eventLogPath))
(codec.map(_.compressedInputStream(in)).getOrElse(in), sparkVersion)
codec.map(_.compressedInputStream(in)).getOrElse(in)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,13 +737,13 @@ private[spark] class Master(
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
try {
val eventLogFile = app.desc.eventLogDir
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
.map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
.getOrElse {
// Event logging is not enabled for this application
app.desc.appUiUrl = notFoundBasePath
return false
}

val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)

if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
Expand All @@ -756,12 +756,12 @@ private[spark] class Master(
return false
}

val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
try {
replayBus.replay(logInput, sparkVersion, eventLogFile)
replayBus.replay(logInput, eventLogFile)
} finally {
logInput.close()
}
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
import org.apache.spark.Logging

/**
* :: DeveloperApi ::
Expand All @@ -53,8 +52,12 @@ private[spark] object CompressionCodec {
"lzf" -> classOf[LZFCompressionCodec].getName,
"snappy" -> classOf[SnappyCompressionCodec].getName)

def getCodecName(conf: SparkConf): String = {
conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
}

def createCodec(conf: SparkConf): CompressionCodec = {
createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
createCodec(conf, getCodecName(conf))
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
Expand All @@ -71,6 +74,20 @@ private[spark] object CompressionCodec {
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}

/**
* Return the short version of the given codec name.
* If it is already a short name, just return it.
*/
def getShortName(codecName: String): String = {
if (shortCompressionCodecNames.contains(codecName)) {
codecName
} else {
shortCompressionCodecNames
.collectFirst { case (k, v) if v == codecName => k }
.getOrElse { throw new IllegalArgumentException(s"No short name for codec $codecName.") }
}
}

val FALLBACK_COMPRESSION_CODEC = "lzf"
val DEFAULT_COMPRESSION_CODEC = "snappy"
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
Expand Down
Loading