diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder new file mode 100644 index 0000000000000..784e58270ab42 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.history.EventFilterBuilder @@ -0,0 +1 @@ +org.apache.spark.deploy.history.BasicEventFilterBuilder \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala new file mode 100644 index 0000000000000..5dad210460fa1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/BasicEventFilterBuilder.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable + +import org.apache.spark.deploy.history.EventFilter.FilterStatistic +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ + +/** + * This class tracks both live jobs and live executors, and pass the list to the + * [[BasicEventFilter]] to help BasicEventFilter to reject finished jobs (+ stages/tasks/RDDs) + * and dead executors. + */ +private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder { + private val _liveJobToStages = new mutable.HashMap[Int, Seq[Int]] + private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]] + private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]] + private val _liveExecutors = new mutable.HashSet[String] + + private var totalJobs: Long = 0L + private var totalStages: Long = 0L + private var totalTasks: Long = 0L + + def liveJobToStages: Map[Int, Seq[Int]] = _liveJobToStages.toMap + def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap + def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap + def liveExecutors: Set[String] = _liveExecutors.toSet + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + totalJobs += 1 + totalStages += jobStart.stageIds.length + _liveJobToStages += jobStart.jobId -> jobStart.stageIds + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int]) + _liveJobToStages -= jobEnd.jobId + _stageToTasks --= stages + _stageToRDDs --= stages + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + _stageToRDDs.getOrElseUpdate(stageSubmitted.stageInfo.stageId, + stageSubmitted.stageInfo.rddInfos.map(_.id)) + } + + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + totalTasks += 1 + val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId, + mutable.HashSet[Long]()) + curTasks += taskStart.taskInfo.taskId + } + + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + _liveExecutors += executorAdded.executorId + } + + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + _liveExecutors -= executorRemoved.executorId + } + + override def createFilter(): EventFilter = new BasicEventFilter(this) + + def statistic(): FilterStatistic = { + FilterStatistic(totalJobs, liveJobToStages.size, totalStages, + liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum) + } +} + +/** + * This class provides the functionality to reject events which are related to the finished + * jobs based on the given information. This class only deals with job related events, and provides + * a PartialFunction which returns false for rejected events for finished jobs, returns true + * otherwise. + */ +private[spark] abstract class JobEventFilter( + stats: Option[FilterStatistic], + jobToStages: Map[Int, Seq[Int]], + stageToTasks: Map[Int, Set[Long]], + stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging { + + private val liveTasks: Set[Long] = stageToTasks.values.flatten.toSet + private val liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet + + logDebug(s"jobs : ${jobToStages.keySet}") + logDebug(s"stages in jobs : ${jobToStages.values.flatten}") + logDebug(s"stages : ${stageToTasks.keySet}") + logDebug(s"tasks in stages : ${stageToTasks.values.flatten}") + logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}") + + override def statistic(): Option[FilterStatistic] = stats + + protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerStageCompleted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerStageSubmitted => + stageToTasks.contains(e.stageInfo.stageId) + + case e: SparkListenerTaskStart => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskGettingResult => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerTaskEnd => + liveTasks.contains(e.taskInfo.taskId) + + case e: SparkListenerJobStart => + jobToStages.contains(e.jobId) + + case e: SparkListenerJobEnd => + jobToStages.contains(e.jobId) + + case e: SparkListenerUnpersistRDD => + liveRDDs.contains(e.rddId) + + case e: SparkListenerExecutorMetricsUpdate => + e.accumUpdates.exists { case (_, stageId, _, _) => + stageToTasks.contains(stageId) + } + + case e: SparkListenerSpeculativeTaskSubmitted => + stageToTasks.contains(e.stageId) + } +} + +/** + * This class rejects events which are related to the finished jobs or dead executors, + * based on the given information. The events which are not related to the job and executor + * will be considered as "Don't mind". + */ +private[spark] class BasicEventFilter( + _stats: FilterStatistic, + _liveJobToStages: Map[Int, Seq[Int]], + _stageToTasks: Map[Int, Set[Long]], + _stageToRDDs: Map[Int, Seq[Int]], + liveExecutors: Set[String]) + extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging { + + def this(builder: BasicEventFilterBuilder) = { + this(builder.statistic(), builder.liveJobToStages, builder.stageToTasks, builder.stageToRDDs, + builder.liveExecutors) + } + + logDebug(s"live executors : $liveExecutors") + + private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = { + case e: SparkListenerExecutorAdded => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId) + case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId) + } + + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + _acceptFn.orElse(acceptFnForJobEvents) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala new file mode 100644 index 0000000000000..c4fe59387e23f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.ServiceLoader + +import scala.io.{Codec, Source} +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse + +import org.apache.spark.deploy.history.EventFilter.FilterStatistic +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler._ +import org.apache.spark.util.{JsonProtocol, Utils} + +/** + * EventFilterBuilder provides the interface to gather the information from events being received + * by [[SparkListenerInterface]], and create a new [[EventFilter]] instance which leverages + * information gathered to decide whether the event should be accepted or not. + */ +private[spark] trait EventFilterBuilder extends SparkListenerInterface { + def createFilter(): EventFilter +} + +/** [[EventFilter]] decides whether the given event should be accepted or rejected. */ +private[spark] trait EventFilter { + /** + * Provide statistic information of event filter, which would be used for measuring the score + * of compaction. + * + * To simplify the condition, currently the fields of statistic are static, since major kinds of + * events compaction would filter out are job related event types. If the filter doesn't track + * with job related events, return None instead. + */ + def statistic(): Option[FilterStatistic] + + /** + * Classify whether the event is accepted or rejected by this filter. + * + * The method should return the partial function which matches the events where the filter can + * decide whether the event should be accepted or rejected. Otherwise it should leave the events + * be unmatched. + */ + def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] +} + +object EventFilter extends Logging { + case class FilterStatistic( + totalJobs: Long, + liveJobs: Long, + totalStages: Long, + liveStages: Long, + totalTasks: Long, + liveTasks: Long) + + def applyFilterToFile( + fs: FileSystem, + filters: Seq[EventFilter], + path: Path, + onAccepted: (String, SparkListenerEvent) => Unit, + onRejected: (String, SparkListenerEvent) => Unit, + onUnidentified: String => Unit): Unit = { + Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in => + val lines = Source.fromInputStream(in)(Codec.UTF8).getLines() + + lines.zipWithIndex.foreach { case (line, lineNum) => + try { + val event = try { + Some(JsonProtocol.sparkEventFromJson(parse(line))) + } catch { + // ignore any exception occurred from unidentified json + case NonFatal(_) => + onUnidentified(line) + None + } + + event.foreach { e => + val results = filters.flatMap(_.acceptFn().lift.apply(e)) + if (results.isEmpty || !results.contains(false)) { + onAccepted(line, e) + } else { + onRejected(line, e) + } + } + } catch { + case e: Exception => + logError(s"Exception parsing Spark event log: ${path.getName}", e) + logError(s"Malformed line #$lineNum: $line\n") + throw e + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala new file mode 100644 index 0000000000000..ce50e7e132524 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileCompactor.scala @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.IOException +import java.net.URI +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.history.EventFilter.FilterStatistic +import org.apache.spark.deploy.history.EventFilterBuildersLoader.LowerIndexLoadRequested +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler.ReplayListenerBus +import org.apache.spark.util.Utils + +/** + * This class compacts the old event log files into one compact file, via two phases reading: + * + * 1) Initialize available [[EventFilterBuilder]] instances, and replay the old event log files with + * builders, so that these builders can gather the information to create [[EventFilter]] instances. + * 2) Initialize [[EventFilter]] instances from [[EventFilterBuilder]] instances, and replay the + * old event log files with filters. Rewrite the events to the compact file which the filters decide + * to accept. + * + * This class will calculate the score based on statistic from [[EventFilter]] instances, which + * represents approximate rate of filtered-out events. Score is being calculated via applying + * heuristic; task events tend to take most size in event log. + * + * When compacting the files, the range of compaction for given file list is determined as: + * (first ~ the file where there're `maxFilesToRetain` files on the right side) + * + * If there're not enough files on the range of compaction, compaction will be skipped. + */ +class EventLogFileCompactor( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem) extends Logging { + import EventFilterBuildersLoader._ + + private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN) + private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD) + + private var filterBuildersLoader = new EventFilterBuildersLoader(fs) + private var loadedLogPath: Path = _ + + def compact(reader: EventLogFileReader): (CompactionResult.Value, Option[Long]) = { + if (loadedLogPath == null) { + loadedLogPath = reader.rootPath + } else { + require(loadedLogPath == null || reader.rootPath == loadedLogPath, + "An instance of compactor should deal with same path of event log.") + } + + if (reader.lastIndex.isEmpty) { + return (CompactionResult.NOT_ENOUGH_FILES, None) + } + + val eventLogFiles = reader.listEventLogFiles + if (eventLogFiles.length < maxFilesToRetain) { + return (CompactionResult.NOT_ENOUGH_FILES, None) + } + + val filesToCompact = findFilesToCompact(eventLogFiles) + if (filesToCompact.isEmpty) { + return (CompactionResult.NOT_ENOUGH_FILES, None) + } + + val builders = loadFilesToFilterBuilder(filesToCompact) + val filters = builders.map(_.createFilter()) + val minScore = filters.flatMap(_.statistic()).map(calculateScore).min + + if (minScore < compactionThresholdScore) { + (CompactionResult.LOW_SCORE_FOR_COMPACTION, None) + } else { + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + rewriter.rewrite(filesToCompact) + cleanupCompactedFiles(filesToCompact) + (CompactionResult.SUCCESS, Some(RollingEventLogFilesWriter.getEventLogFileIndex( + filesToCompact.last.getPath.getName))) + } + } + + private def loadFilesToFilterBuilder(files: Seq[FileStatus]): Seq[EventFilterBuilder] = { + try { + filterBuildersLoader.loadNewFiles(files) + } catch { + case _: LowerIndexLoadRequested => + // reset loader and load again + filterBuildersLoader = new EventFilterBuildersLoader(fs) + loadFilesToFilterBuilder(files) + + case NonFatal(e) => + // reset loader before throwing exception, as filter builders aren't properly loaded + filterBuildersLoader = new EventFilterBuildersLoader(fs) + throw e + } + } + + private def calculateScore(stats: FilterStatistic): Double = { + // For now it's simply measuring how many task events will be filtered out (rejected) + // but it can be sophisticated later once we get more heuristic information and found + // the case where this simple calculation doesn't work. + (stats.totalTasks - stats.liveTasks) * 1.0 / stats.totalTasks + } + + private def cleanupCompactedFiles(files: Seq[FileStatus]): Unit = { + files.foreach { file => + var deleted = false + try { + deleted = fs.delete(file.getPath, true) + } catch { + case _: IOException => + } + if (!deleted) { + logWarning(s"Failed to remove ${file.getPath} / skip removing.") + } + } + } + + private def findFilesToCompact( + eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val numNormalEventLogFiles = { + if (EventLogFileWriter.isCompacted(eventLogFiles.head.getPath)) { + eventLogFiles.length - 1 + } else { + eventLogFiles.length + } + } + + // This avoids compacting only compact file. + if (numNormalEventLogFiles > maxFilesToRetain) { + eventLogFiles.dropRight(maxFilesToRetain) + } else { + Seq.empty + } + } +} + +object CompactionResult extends Enumeration { + val SUCCESS, NOT_ENOUGH_FILES, LOW_SCORE_FOR_COMPACTION = Value +} + +class EventFilterBuildersLoader(fs: FileSystem) { + // the implementation of this bus is expected to be stateless + private val bus = new ReplayListenerBus() + + /** Loads all available EventFilterBuilders in classloader via ServiceLoader */ + private val filterBuilders: Seq[EventFilterBuilder] = ServiceLoader.load( + classOf[EventFilterBuilder], Utils.getContextOrSparkClassLoader).asScala.toSeq + + filterBuilders.foreach(bus.addListener) + + private var latestIndexLoaded: Long = -1L + + /** only exposed for testing; simple metric to help testing */ + private[history] var numFilesToLoad: Long = 0L + + /** + * Initializes EventFilterBuilders via replaying events in given files. Loading files are done + * incrementally, via dropping indices which are already loaded and replaying remaining files. + * For example, If the last index of requested files is same as the last index being loaded, + * this will not replay any files. + * + * If the last index of requested files is smaller than the last index being loaded, it will + * throw [[LowerIndexLoadRequested]], which caller can decide whether ignoring it or + * invalidating loader and retrying. + */ + def loadNewFiles(eventLogFiles: Seq[FileStatus]): Seq[EventFilterBuilder] = { + require(eventLogFiles.nonEmpty) + + val idxToStatuses = eventLogFiles.map { status => + val idx = RollingEventLogFilesWriter.getEventLogFileIndex(status.getPath.getName) + idx -> status + } + + val newLatestIdx = idxToStatuses.last._1 + if (newLatestIdx < latestIndexLoaded) { + throw new LowerIndexLoadRequested("Loader already loads higher index of event log than" + + " requested.") + } + + val filesToLoad = idxToStatuses + .filter { case (idx, _) => idx > latestIndexLoaded } + .map { case (_, status) => status.getPath } + + if (filesToLoad.nonEmpty) { + filesToLoad.foreach { log => + Utils.tryWithResource(EventLogFileReader.openEventLog(log, fs)) { in => + bus.replay(in, log.getName) + } + numFilesToLoad += 1 + } + + latestIndexLoaded = newLatestIdx + } + + filterBuilders + } +} + +object EventFilterBuildersLoader { + class LowerIndexLoadRequested(_msg: String) extends Exception(_msg) +} + +/** + * This class rewrites the event log files into one compact file: the compact file will only + * contain the events which pass the filters. Events will be dropped only when all filters + * decide to reject the event or don't mind about the event. Otherwise, the original line for + * the event is written to the compact file as it is. + */ +class FilteredEventLogFileRewriter( + sparkConf: SparkConf, + hadoopConf: Configuration, + fs: FileSystem, + filters: Seq[EventFilter]) { + + def rewrite(eventLogFiles: Seq[FileStatus]): String = { + require(eventLogFiles.nonEmpty) + + val lastIndexEventLogPath = eventLogFiles.last.getPath + val logWriter = new CompactedEventLogFileWriter(lastIndexEventLogPath, "dummy", None, + lastIndexEventLogPath.getParent.toUri, sparkConf, hadoopConf) + + logWriter.start() + eventLogFiles.foreach { file => + EventFilter.applyFilterToFile(fs, filters, file.getPath, + onAccepted = (line, _) => logWriter.writeEvent(line, flushLogger = true), + onRejected = (_, _) => {}, + onUnidentified = line => logWriter.writeEvent(line, flushLogger = true) + ) + } + logWriter.stop() + + logWriter.logPath + } +} + +/** + * This class helps to write compact file; to avoid reimplementing everything, it extends + * [[SingleEventLogFileWriter]], but only `originalFilePath` is used to determine the + * path of compact file. + */ +class CompactedEventLogFileWriter( + originalFilePath: Path, + appId: String, + appAttemptId: Option[String], + logBaseDir: URI, + sparkConf: SparkConf, + hadoopConf: Configuration) + extends SingleEventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) { + + override val logPath: String = originalFilePath.toUri.toString + EventLogFileWriter.COMPACTED +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index c8956ed3d423d..88a8ffa548b72 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -173,7 +173,8 @@ class SingleFileEventLogFileReader( override def fileSizeForLastIndex: Long = status.getLen - override def completed: Boolean = !rootPath.getName.endsWith(EventLogFileWriter.IN_PROGRESS) + override def completed: Boolean = !rootPath.getName.stripSuffix(EventLogFileWriter.COMPACTED) + .endsWith(EventLogFileWriter.IN_PROGRESS) override def fileSizeForLastIndexForDFS: Option[Long] = { if (completed) { @@ -217,16 +218,33 @@ class RollingEventLogFilesFileReader( private lazy val appStatusFile = files.find(isAppStatusFile).get private lazy val eventLogFiles: Seq[FileStatus] = { - val eventLogFiles = files.filter(isEventLogFile).sortBy { status => - getIndex(status.getPath.getName) - } - val indices = eventLogFiles.map { file => getIndex(file.getPath.getName) }.sorted + val idxToEventLogFiles = files.map { status => + val filePath = status.getPath + if (isEventLogFile(filePath.getName)) { + getEventLogFileIndex(filePath.getName) -> status + } else { + -1L -> status + } + }.filter { case (idx, _) => idx >= 0 } + + val eventLogFiles = idxToEventLogFiles.sortBy { case (idx, status) => + // trick to place compacted file later than normal file if index is same. + if (EventLogFileWriter.isCompacted(status.getPath)) { + idx + 0.1 + } else { + idx + } + }.map(_._2) + + val filesToRead = dropBeforeLastCompactFile(eventLogFiles) + val indices = filesToRead.map { file => getEventLogFileIndex(file.getPath.getName) } require((indices.head to indices.last) == indices, "Found missing event log file, expected" + - s" indices: ${(indices.head to indices.last)}, actual: ${indices}") - eventLogFiles + s" indices: ${indices.head to indices.last}, actual: ${indices}") + filesToRead } - override def lastIndex: Option[Long] = Some(getIndex(lastEventLogFile.getPath.getName)) + override def lastIndex: Option[Long] = Some( + getEventLogFileIndex(lastEventLogFile.getPath.getName)) override def fileSizeForLastIndex: Long = lastEventLogFile.getLen @@ -261,4 +279,11 @@ class RollingEventLogFilesFileReader( override def totalSize: Long = eventLogFiles.map(_.getLen).sum private def lastEventLogFile: FileStatus = eventLogFiles.last + + private def dropBeforeLastCompactFile(eventLogFiles: Seq[FileStatus]): Seq[FileStatus] = { + val lastCompactedFileIdx = eventLogFiles.lastIndexWhere { fs => + EventLogFileWriter.isCompacted(fs.getPath) + } + eventLogFiles.drop(lastCompactedFileIdx) + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 3fa5ef94892aa..1d58d054b7825 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -113,9 +113,9 @@ abstract class EventLogFileWriter( } } - protected def writeJson(json: String, flushLogger: Boolean = false): Unit = { + protected def writeLine(line: String, flushLogger: Boolean = false): Unit = { // scalastyle:off println - writer.foreach(_.println(json)) + writer.foreach(_.println(line)) // scalastyle:on println if (flushLogger) { writer.foreach(_.flush()) @@ -164,6 +164,7 @@ abstract class EventLogFileWriter( object EventLogFileWriter { // Suffix applied to the names of files still being written by applications. val IN_PROGRESS = ".inprogress" + val COMPACTED = ".compact" val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) @@ -192,9 +193,11 @@ object EventLogFileWriter { def codecName(log: Path): Option[String] = { // Compression codec is encoded as an extension, e.g. app_123.lzf // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) + val logName = log.getName.stripSuffix(COMPACTED).stripSuffix(IN_PROGRESS) logName.split("\\.").tail.lastOption } + + def isCompacted(log: Path): Boolean = log.getName.endsWith(COMPACTED) } /** @@ -211,7 +214,7 @@ class SingleEventLogFileWriter( override val logPath: String = SingleEventLogFileWriter.getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) - private val inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS + protected def inProgressPath = logPath + EventLogFileWriter.IN_PROGRESS override def start(): Unit = { requireLogBaseDirAsDirectory() @@ -222,7 +225,7 @@ class SingleEventLogFileWriter( } override def writeEvent(eventJson: String, flushLogger: Boolean = false): Unit = { - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } /** @@ -327,10 +330,11 @@ class RollingEventLogFilesWriter( } } - writeJson(eventJson, flushLogger) + writeLine(eventJson, flushLogger) } - private def rollEventLogFile(): Unit = { + /** exposed for testing only */ + private[history] def rollEventLogFile(): Unit = { closeWriter() index += 1 @@ -399,16 +403,20 @@ object RollingEventLogFilesWriter { status.isDirectory && status.getPath.getName.startsWith(EVENT_LOG_DIR_NAME_PREFIX) } + def isEventLogFile(fileName: String): Boolean = { + fileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + } + def isEventLogFile(status: FileStatus): Boolean = { - status.isFile && status.getPath.getName.startsWith(EVENT_LOG_FILE_NAME_PREFIX) + status.isFile && isEventLogFile(status.getPath.getName) } def isAppStatusFile(status: FileStatus): Boolean = { status.isFile && status.getPath.getName.startsWith(APPSTATUS_FILE_NAME_PREFIX) } - def getIndex(eventLogFileName: String): Long = { - require(eventLogFileName.startsWith(EVENT_LOG_FILE_NAME_PREFIX), "Not an event log file!") + def getEventLogFileIndex(eventLogFileName: String): Long = { + require(isEventLogFile(eventLogFileName), "Not an event log file!") val index = eventLogFileName.stripPrefix(EVENT_LOG_FILE_NAME_PREFIX).split("_")(0) index.toLong } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f560b7e9157b5..31fc8beb68115 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -159,6 +159,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + // Visible for testing. + private[history] val logToCompactor = new mutable.HashMap[String, EventLogFileCompactor] + // Used to store the paths, which are being processed. This enable the replay log tasks execute // asynchronously and make sure that checkForLogs would not process a path repeatedly. private val processing = ConcurrentHashMap.newKeySet[String] @@ -519,23 +522,32 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // to parse it. This will allow the cleaner code to detect the file as stale later on // if it was not possible to parse it. listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs, - None, None, reader.fileSizeForLastIndex, reader.lastIndex, + None, None, reader.fileSizeForLastIndex, reader.lastIndex, None, reader.completed)) reader.fileSizeForLastIndex > 0 } } - .sortWith { case (entry1, entry2) => - entry1.modificationTime > entry2.modificationTime + .sortWith { case (reader1, reader2) => + reader1.modificationTime > reader2.modificationTime } if (updated.nonEmpty) { logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") } - updated.foreach { entry => - processing(entry.rootPath) + updated.foreach { reader => + processing(reader.rootPath) try { - val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true) + val task: Runnable = () => { + val (shouldRenewReader, updatedLastCompactionIndex) = compact(reader) + // we should renew reader if the list of event log files are changed in `compact` + val newReader = if (shouldRenewReader) { + EventLogFileReader(fs, reader.rootPath).get + } else { + reader + } + mergeApplicationListing(newReader, newLastScanTime, true, updatedLastCompactionIndex) + } replayExecutor.submit(task) } catch { // let the iteration over the updated entries break, since an exception on @@ -561,6 +573,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) listing.delete(classOf[LogInfo], log.logPath) + cleanupCompactor(log.logPath) } } @@ -570,6 +583,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** exposed for testing */ + private[history] def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = { + reader.lastIndex match { + case Some(lastIndex) => + try { + val rootPath = reader.rootPath.toString + val info = listing.read(classOf[LogInfo], rootPath) + if (info.lastCompactionIndex.isEmpty || info.lastCompactionIndex.get < lastIndex) { + // haven't tried compaction for this index, do compaction + val compactor = logToCompactor.getOrElseUpdate(rootPath, + new EventLogFileCompactor(conf, hadoopConf, fs)) + val (compactionResult, lastCompactionIndex) = compactor.compact(reader) + if (compactionResult == CompactionResult.SUCCESS) { + listing.write(info.copy(lastCompactionIndex = lastCompactionIndex)) + (true, Some(lastIndex)) + } else { + (false, Some(lastIndex)) + } + } else { + (false, info.lastCompactionIndex) + } + } catch { + case _: NoSuchElementException => + // this should exist, but ignoring doesn't hurt much + (false, None) + } + + case None => (false, None) // This is not applied to single event log file. + } + } + private[history] def shouldReloadLog(info: LogInfo, reader: EventLogFileReader): Boolean = { if (info.isComplete != reader.completed) { true @@ -661,10 +705,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private def mergeApplicationListing( reader: EventLogFileReader, scanTime: Long, - enableOptimizations: Boolean): Unit = { + enableOptimizations: Boolean, + lastCompactionIndex: Option[Long]): Unit = { try { pendingReplayTasksCount.incrementAndGet() - doMergeApplicationListing(reader, scanTime, enableOptimizations) + doMergeApplicationListing(reader, scanTime, enableOptimizations, lastCompactionIndex) if (conf.get(CLEANER_ENABLED)) { checkAndCleanLog(reader.rootPath.toString) } @@ -672,12 +717,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: InterruptedException => throw e case e: AccessControlException => + val rootPath = reader.rootPath // We don't have read permissions on the log file - logWarning(s"Unable to read log ${reader.rootPath}", e) - blacklist(reader.rootPath) + logWarning(s"Unable to read log ${rootPath}", e) + blacklist(rootPath) // SPARK-28157 We should remove this blacklisted entry from the KVStore // to handle permission-only changes with the same file sizes later. - listing.delete(classOf[LogInfo], reader.rootPath.toString) + listing.delete(classOf[LogInfo], rootPath.toString) + cleanupCompactor(rootPath.toString) case e: Exception => logError("Exception while merging application listings", e) } finally { @@ -693,7 +740,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private[history] def doMergeApplicationListing( reader: EventLogFileReader, scanTime: Long, - enableOptimizations: Boolean): Unit = { + enableOptimizations: Boolean, + lastCompactionIndex: Option[Long]): Unit = { val eventsFilter: ReplayEventsFilter = { eventString => eventString.startsWith(APPL_START_EVENT_PREFIX) || eventString.startsWith(APPL_END_EVENT_PREFIX) || @@ -771,8 +819,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) invalidateUI(app.info.id, app.attempts.head.info.attemptId) addListing(app) listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id), - app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, - reader.lastIndex, reader.completed)) + app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, reader.lastIndex, + lastCompactionIndex, reader.completed)) // For a finished log, remove the corresponding "in progress" entry from the listing DB if // the file is really gone. @@ -796,7 +844,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - doMergeApplicationListing(reader, scanTime, enableOptimizations = false) + doMergeApplicationListing(reader, scanTime, enableOptimizations = false, + lastCompactionIndex) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -804,7 +853,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // does not make progress after the configured max log age. listing.write( LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, - reader.fileSizeForLastIndex, reader.lastIndex, reader.completed)) + reader.fileSizeForLastIndex, reader.lastIndex, lastCompactionIndex, reader.completed)) } } @@ -832,6 +881,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) + cleanupCompactor(log.logPath) } log.appId.foreach { appId => @@ -879,6 +929,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) + cleanupCompactor(log.logPath) } } @@ -921,6 +972,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logInfo(s"Deleting expired event log for ${attempt.logPath}") val logPath = new Path(logDir, attempt.logPath) listing.delete(classOf[LogInfo], logPath.toString()) + cleanupCompactor(logPath.toString) cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) if (deleteLog(fs, logPath)) { countDeleted += 1 @@ -963,7 +1015,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: NoSuchElementException => // For every new driver log file discovered, create a new entry in listing listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None, - None, f.getLen(), None, false)) + None, f.getLen(), None, None, false)) false } if (deleteFile) { @@ -990,9 +1042,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } /** - * Rebuilds the application state store from its event log. + * Rebuilds the application state store from its event log. Exposed for testing. */ - private def rebuildAppStore( + private[spark] def rebuildAppStore( store: KVStore, reader: EventLogFileReader, lastUpdated: Long): Unit = { @@ -1011,8 +1063,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } replayBus.addListener(listener) try { + val eventLogFiles = reader.listEventLogFiles logInfo(s"Parsing ${reader.rootPath} to re-build UI...") - parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed) + parseAppEventLogs(eventLogFiles, replayBus, !reader.completed) trackingStore.close(false) logInfo(s"Finished parsing ${reader.rootPath}") } catch { @@ -1123,30 +1176,61 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // At this point the disk data either does not exist or was deleted because it failed to // load, so the event log needs to be replayed. - val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), - attempt.lastIndex) - val isCompressed = reader.compressionCodec.isDefined - logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") - val lease = dm.lease(reader.totalSize, isCompressed) - val newStorePath = try { - Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => - rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + var retried = false + var newStorePath: File = null + + while(newStorePath == null) { + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + val isCompressed = reader.compressionCodec.isDefined + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(reader.totalSize, isCompressed) + try { + Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store => + rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + } + newStorePath = lease.commit(appId, attempt.info.attemptId) + } catch { + case _: IOException if !retried => + // compaction may touch the file(s) which app rebuild wants to read + // compaction wouldn't run in short interval, so try again... + logWarning(s"Exception occurred while rebuilding app $appId - trying again...") + lease.rollback() + retried = true + + case e: Exception => + lease.rollback() + throw e } - lease.commit(appId, attempt.info.attemptId) - } catch { - case e: Exception => - lease.rollback() - throw e } KVUtils.open(newStorePath, metadata) } private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { - val store = new InMemoryStore() - val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), - attempt.lastIndex) - rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime()) + var retried = false + var store: KVStore = null + + while (store == null) { + try { + val s = new InMemoryStore() + val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath), + attempt.lastIndex) + rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime()) + store = s + } catch { + case _: IOException if !retried => + // compaction may touch the file(s) which app rebuild wants to read + // compaction wouldn't run in short interval, so try again... + logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " + + "trying again...") + retried = true + + case e: Exception => + throw e + } + } + store } @@ -1176,6 +1260,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } deleted } + + private def cleanupCompactor(logPath: String): Unit = { + logToCompactor -= logPath + } } private[history] object FsHistoryProvider { @@ -1194,6 +1282,8 @@ private[history] object FsHistoryProvider { * all data and re-generate the listing data from the event logs. */ private[history] val CURRENT_LISTING_VERSION = 1L + + private val COMPACT_THRESHOLD_FILTER_IN_RATE = 0.3d } private[history] case class FsHistoryProviderMetadata( @@ -1219,6 +1309,7 @@ private[history] case class LogInfo( fileSize: Long, @JsonDeserialize(contentAs = classOf[JLong]) lastIndex: Option[Long], + lastCompactionIndex: Option[Long], isComplete: Boolean) private[history] class AttemptInfoWrapper( @@ -1341,7 +1432,10 @@ private[history] class AppListingListener( } - private class MutableAttemptInfo(logPath: String, fileSize: Long, lastIndex: Option[Long]) { + private class MutableAttemptInfo( + logPath: String, + fileSize: Long, + lastIndex: Option[Long]) { var attemptId: Option[String] = None var startTime = new Date(-1) var endTime = new Date(-1) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9d7b31aa30f0d..0e5dc602f2e18 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -195,6 +195,22 @@ package object config { "configured to be at least 10 MiB.") .createWithDefaultString("128m") + private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = + ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") + .doc("The maximum number of event log files which will be retained as non-compacted. " + + "By default, all event log files will be retained. Please set the configuration " + + s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " + + "the overall size of event log files.") + .intConf + .checkValue(_ > 0, "Max event log files to retain should be higher than 0.") + .createWithDefault(Integer.MAX_VALUE) + + private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD = + ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold") + .internal() + .doubleConf + .createWithDefault(0.7d) + private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala new file mode 100644 index 0000000000000..ca570c257c025 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterBuilderSuite.scala @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{SparkFunSuite, Success, TaskResultLost, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper + +class BasicEventFilterBuilderSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + + override protected def beforeEach(): Unit = { + ListenerEventsTestHelper.reset() + } + + test("track live jobs") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = Array("1", "2") + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // Start a job with 2 stages / 4 tasks each + time += 1 + + val rddsForStage1 = createRdds(2) + val rddsForStage2 = createRdds(2) + + val stage1 = createStage(rddsForStage1, Nil) + val stage2 = createStage(rddsForStage2, Seq(stage1.stageId)) + val stages = Seq(stage1, stage2) + + val jobProps = createJobProps() + listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) + + // Submit stage 1 + time += 1 + stages.head.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps)) + + // Start tasks from stage 1 + time += 1 + + val s1Tasks = ListenerEventsTestHelper.createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, + stages.head.attemptNumber(), task)) + } + + // Fail one of the tasks, re-start it. + time += 1 + s1Tasks.head.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", TaskResultLost, s1Tasks.head, new ExecutorMetrics, null)) + + time += 1 + val reattempt = createTaskWithNewAttempt(s1Tasks.head, time) + listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber, + reattempt)) + + // Succeed all tasks in stage 1. + val pending = s1Tasks.drop(1) ++ Seq(reattempt) + + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + time += 1 + pending.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage 1. + time += 1 + stages.head.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stages.head)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys.toSeq === Seq(0)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + // stage 1 not yet submitted + assert(listener.stageToTasks.keys.toSeq === Seq(0)) + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + + // Submit stage 2. + time += 1 + stages.last.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps)) + + // Start and fail all tasks of stage 2. + time += 1 + val s2Tasks = createTasks(4, execIds, time) + s2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, + stages.last.attemptNumber, + task)) + } + + time += 1 + s2Tasks.foreach { task => + task.markFinished(TaskState.FAILED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber, + "taskType", TaskResultLost, task, new ExecutorMetrics, null)) + } + + // Fail stage 2. + time += 1 + stages.last.completionTime = Some(time) + stages.last.failureReason = Some("uh oh") + listener.onStageCompleted(SparkListenerStageCompleted(stages.last)) + + // - Re-submit stage 2, all tasks, and succeed them and the stage. + val oldS2 = stages.last + val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks, + oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics) + + time += 1 + newS2.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps)) + + val newS2Tasks = createTasks(4, execIds, time) + + newS2Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task)) + } + + time += 1 + newS2Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType", + Success, task, new ExecutorMetrics, null)) + } + + time += 1 + newS2.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(newS2)) + + assert(listener.liveJobToStages.keys.toSeq === Seq(1)) + assert(listener.liveJobToStages(1) === Seq(0, 1)) + assert(listener.stageToRDDs.keys === Set(0, 1)) + assert(listener.stageToRDDs(0) === rddsForStage1.map(_.id)) + assert(listener.stageToRDDs(1) === rddsForStage2.map(_.id)) + assert(listener.stageToTasks.keys.toSet === Set(0, 1)) + // stage 0 is finished but it stores the information regarding stage + assert(listener.stageToTasks(0) === (s1Tasks ++ Seq(reattempt)).map(_.taskId).toSet) + // stage 1 is newly added + assert(listener.stageToTasks(1) === (s2Tasks ++ newS2Tasks).map(_.taskId).toSet) + + // Start next job. + time += 1 + + val rddsForStage3 = createRdds(2) + val rddsForStage4 = createRdds(2) + + val stage3 = createStage(rddsForStage3, Nil) + val stage4 = createStage(rddsForStage4, Seq(stage3.stageId)) + val stagesForJob2 = Seq(stage3, stage4) + + listener.onJobStart(SparkListenerJobStart(2, time, stagesForJob2, jobProps)) + + // End job 1. + time += 1 + listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded)) + + // everything related to job 1 should be cleaned up, but not for job 2 + assert(listener.liveJobToStages.keys.toSet === Set(2)) + assert(listener.stageToRDDs.isEmpty) + // stageToTasks has no information for job 2, as no task has been started + assert(listener.stageToTasks.isEmpty) + } + + test("track live executors") { + var time = 0L + + val listener = new BasicEventFilterBuilder + listener.onOtherEvent(SparkListenerLogStart("TestSparkVersion")) + + // Start the application. + time += 1 + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + None)) + + // Start a couple of executors. + time += 1 + val execIds = (1 to 3).map(_.toString) + execIds.foreach { id => + listener.onExecutorAdded(createExecutorAddedEvent(id, time)) + } + + // End one of executors. + time += 1 + listener.onExecutorRemoved(createExecutorRemovedEvent(execIds.head, time)) + + assert(listener.liveExecutors === execIds.drop(1).toSet) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala new file mode 100644 index 0000000000000..18fa2b52004ea --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/BasicEventFilterSuite.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import org.apache.spark.{storage, SparkFunSuite, Success, TaskState} +import org.apache.spark.deploy.history.EventFilter.FilterStatistic +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.scheduler._ +import org.apache.spark.status.ListenerEventsTestHelper +import org.apache.spark.storage.{BlockManagerId, RDDBlockId, StorageLevel} + +class BasicEventFilterSuite extends SparkFunSuite { + import ListenerEventsTestHelper._ + import BasicEventFilterSuite._ + + test("filter out events for finished jobs") { + // assume finished job 1 with stage 1, tasks (1, 2), rdds (1, 2) + // live job 2 with stages 2, tasks (3, 4), rdds (3, 4) + val liveJobToStages: Map[Int, Seq[Int]] = Map(2 -> Seq(2, 3)) + val stageToTasks: Map[Int, Set[Long]] = Map(2 -> Set(3, 4), 3 -> Set(5, 6)) + val stageToRDDs: Map[Int, Seq[Int]] = Map(2 -> Seq(3, 4), 3 -> Seq(5, 6)) + val liveExecutors: Set[String] = Set("1", "2") + val filterStats = FilterStatistic(2, 1, 2, 1, 4, 2) + + val filter = new BasicEventFilter(filterStats, liveJobToStages, stageToTasks, stageToRDDs, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // Verifying with finished job 1 + val rddsForStage1 = createRddsWithId(1 to 2) + val stage1 = createStage(1, rddsForStage1, Nil) + val tasksForStage1 = createTasks(Seq(1L, 2L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob1 = SparkListenerJobStart(1, 0, Seq(stage1)) + val jobEndEventForJob1 = SparkListenerJobEnd(1, 0, JobSucceeded) + val stageSubmittedEventsForJob1 = SparkListenerStageSubmitted(stage1) + val stageCompletedEventsForJob1 = SparkListenerStageCompleted(stage1) + val unpersistRDDEventsForJob1 = (1 to 2).map(SparkListenerUnpersistRDD) + + // job events for finished job should be rejected + assertFilterJobEvents(acceptFn, jobStartEventForJob1, jobEndEventForJob1, Some(false)) + + // stage events for finished job should be rejected + // NOTE: it doesn't filter out stage events which are also related to the executor + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob1, + stageCompletedEventsForJob1, + unpersistRDDEventsForJob1, + SparkListenerSpeculativeTaskSubmitted(stage1.stageId, stageAttemptId = 1), + Some(false)) + + // task events for finished job should be rejected + assertFilterTaskEvents(acceptFn, tasksForStage1, stage1, Some(false)) + + // Verifying with live job 2 + val rddsForStage2 = createRddsWithId(3 to 4) + val stage2 = createStage(2, rddsForStage2, Nil) + val tasksForStage2 = createTasks(Seq(3L, 4L), liveExecutors.toArray, 0) + tasksForStage1.foreach { task => task.markFinished(TaskState.FINISHED, 5) } + + val jobStartEventForJob2 = SparkListenerJobStart(2, 0, Seq(stage2)) + val stageSubmittedEventsForJob2 = SparkListenerStageSubmitted(stage2) + val stageCompletedEventsForJob2 = SparkListenerStageCompleted(stage2) + val unpersistRDDEventsForJob2 = rddsForStage2.map { rdd => SparkListenerUnpersistRDD(rdd.id) } + + // job events for live job should be accepted + assert(acceptFn(jobStartEventForJob2) === Some(true)) + + // stage events for live job should be accepted + assertFilterStageEvents( + acceptFn, + stageSubmittedEventsForJob2, + stageCompletedEventsForJob2, + unpersistRDDEventsForJob2, + SparkListenerSpeculativeTaskSubmitted(stage2.stageId, stageAttemptId = 1), + Some(true)) + + // task events for live job should be accepted + assertFilterTaskEvents(acceptFn, tasksForStage2, stage2, Some(true)) + } + + test("filter out events for dead executors") { + // assume executor 1 was dead, and live executor 2 is available + val liveExecutors: Set[String] = Set("2") + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, + liveExecutors) + val acceptFn = filter.acceptFn().lift + + // events for dead executor should be rejected + assert(acceptFn(createExecutorAddedEvent(1)) === Some(false)) + // though the name of event is stage executor metrics, AppStatusListener only deals with + // live executors + assert(acceptFn( + SparkListenerStageExecutorMetrics(1.toString, 0, 0, new ExecutorMetrics)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 1.toString, 1)) === + Some(false)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 1.toString)) === + Some(false)) + assert(acceptFn(createExecutorRemovedEvent(1)) === Some(false)) + + // events for live executor should be accepted + assert(acceptFn(createExecutorAddedEvent(2)) === Some(true)) + assert(acceptFn( + SparkListenerStageExecutorMetrics(2.toString, 0, 0, new ExecutorMetrics)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorBlacklisted(0, 2.toString, 1)) === + Some(true)) + assert(acceptFn(SparkListenerExecutorUnblacklisted(0, 2.toString)) === + Some(true)) + assert(acceptFn(createExecutorRemovedEvent(2)) === Some(true)) + } + + test("other events should be left to other filters") { + def assertNone(predicate: => Option[Boolean]): Unit = { + assert(predicate === None) + } + + val filter = new BasicEventFilter(EMPTY_STATS, Map.empty, Map.empty, Map.empty, Set.empty) + val acceptFn = filter.acceptFn().lift + + assertNone(acceptFn(SparkListenerEnvironmentUpdate(Map.empty))) + assertNone(acceptFn(SparkListenerApplicationStart("1", Some("1"), 0, "user", None))) + assertNone(acceptFn(SparkListenerApplicationEnd(1))) + val bmId = BlockManagerId("1", "host1", 1) + assertNone(acceptFn(SparkListenerBlockManagerAdded(0, bmId, 1))) + assertNone(acceptFn(SparkListenerBlockManagerRemoved(1, bmId))) + assertNone(acceptFn(SparkListenerBlockUpdated( + storage.BlockUpdatedInfo(bmId, RDDBlockId(1, 1), StorageLevel.DISK_ONLY, 0, 10)))) + assertNone(acceptFn(SparkListenerNodeBlacklisted(0, "host1", 1))) + assertNone(acceptFn(SparkListenerNodeUnblacklisted(0, "host1"))) + assertNone(acceptFn(SparkListenerLogStart("testVersion"))) + } + + private def assertFilterJobEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + jobStart: SparkListenerJobStart, + jobEnd: SparkListenerJobEnd, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(jobStart) === expectedVal) + assert(acceptFn(jobEnd) === expectedVal) + } + + private def assertFilterStageEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + stageSubmitted: SparkListenerStageSubmitted, + stageCompleted: SparkListenerStageCompleted, + unpersistRDDs: Seq[SparkListenerUnpersistRDD], + taskSpeculativeSubmitted: SparkListenerSpeculativeTaskSubmitted, + expectedVal: Option[Boolean]): Unit = { + assert(acceptFn(stageSubmitted) === expectedVal) + assert(acceptFn(stageCompleted) === expectedVal) + unpersistRDDs.foreach { event => + assert(acceptFn(event) === expectedVal) + } + assert(acceptFn(taskSpeculativeSubmitted) === expectedVal) + } + + private def assertFilterTaskEvents( + acceptFn: SparkListenerEvent => Option[Boolean], + taskInfos: Seq[TaskInfo], + stageInfo: StageInfo, + expectedVal: Option[Boolean]): Unit = { + taskInfos.foreach { task => + val taskStartEvent = SparkListenerTaskStart(stageInfo.stageId, 0, task) + assert(acceptFn(taskStartEvent) === expectedVal) + + val taskGettingResultEvent = SparkListenerTaskGettingResult(task) + assert(acceptFn(taskGettingResultEvent) === expectedVal) + + val taskEndEvent = SparkListenerTaskEnd(stageInfo.stageId, 0, "taskType", + Success, task, new ExecutorMetrics, null) + assert(acceptFn(taskEndEvent) === expectedVal) + } + } +} + +object BasicEventFilterSuite { + val EMPTY_STATS = FilterStatistic(0, 0, 0, 0, 0, 0) +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala new file mode 100644 index 0000000000000..0dbfd29239fd7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.parse +import org.scalatest.PrivateMethodTester + +import org.apache.spark.{SparkConf, SparkFunSuite, Success} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.writeEventsToRollingWriter +import org.apache.spark.executor.ExecutorMetrics +import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.status.ListenerEventsTestHelper +import org.apache.spark.util.{JsonProtocol, Utils} + +class EventLogFileCompactorSuite extends SparkFunSuite with PrivateMethodTester { + import ListenerEventsTestHelper._ + + private val sparkConf = testSparkConf() + private val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + + test("No compact file, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("No compact file, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 5).map(_ => testEvent): _*) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 2) + } + } + + test("compact file exists, less origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 2).map(_ => testEvent): _*) + + fakeCompactFirstEventLogFile(fs, logPath) + + val newReader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newReader.listEventLogFiles, compactor.compact(newReader), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, number of origin files are same as max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 4).map(_ => testEvent): _*) + + fakeCompactFirstEventLogFile(fs, logPath) + + val newReader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertNoCompaction(fs, newReader.listEventLogFiles, compactor.compact(newReader), + CompactionResult.NOT_ENOUGH_FILES) + } + } + + test("compact file exists, more origin files available than max files to retain") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 10).map(_ => testEvent): _*) + + fakeCompactFirstEventLogFile(fs, logPath) + + val newReader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, newReader.listEventLogFiles, compactor.compact(newReader), + expectedNumOfFilesCompacted = 7) + } + } + + private def fakeCompactFirstEventLogFile(fs: FileSystem, logPath: String): Unit = { + val reader = EventLogFileReader(fs, new Path(logPath)).get + val fileStatuses = reader.listEventLogFiles + val fileToCompact = fileStatuses.head.getPath + val compactedPath = new Path(fileToCompact.getParent, + fileToCompact.getName + EventLogFileWriter.COMPACTED) + assert(fs.rename(fileToCompact, compactedPath)) + } + + test("events for finished job are dropped in new compact file") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + // 1, 2 will be compacted into one file, 3~5 are dummies to ensure max files to retain + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + Seq( + SparkListenerExecutorAdded(0, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(1, 0, Seq.empty)), + Seq( + SparkListenerJobEnd(1, 1, JobSucceeded), + SparkListenerExecutorAdded(2, "exec2", new ExecutorInfo("host2", 1, Map.empty))), + testEvent, + testEvent, + testEvent) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 2) + + val expectCompactFileBasePath = reader.listEventLogFiles.take(2).last.getPath + val compactFilePath = getCompactFilePath(expectCompactFileBasePath) + Utils.tryWithResource(EventLogFileReader.openEventLog(compactFilePath, fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines().toList + assert(lines.length === 2, "Compacted file should have only two events being accepted") + lines.foreach { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + assert(!event.isInstanceOf[SparkListenerJobStart] && + !event.isInstanceOf[SparkListenerJobEnd]) + } + } + } + } + + test("Don't compact file if score is lower than threshold") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + val newConf = sparkConf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.7d) + + // only one of two tasks is finished, which would score 0.5d + val tasks = createTasks(2, Array("exec1"), 0L).map(createTaskStartEvent(_, 1, 0)) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, newConf, hadoopConf, + tasks, + Seq(SparkListenerTaskEnd(1, 0, "taskType", Success, tasks.head.taskInfo, + new ExecutorMetrics, null)), + testEvent, + testEvent, + testEvent) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(newConf, hadoopConf, fs) + assertNoCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + CompactionResult.LOW_SCORE_FOR_COMPACTION) + } + } + + test("incremental load of event filter builder") { + val loaderMethod = PrivateMethod[EventFilterBuildersLoader](Symbol("filterBuildersLoader")) + + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + + // write 1-5, and 6 + (1 to 5).map(_ => testEvent).foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = true) + } + writeEventsToRollingWriter(writer, testEvent, rollFile = false) + + // don't stop writer as we will add more files later + val logPath = writer.logPath + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 3) + + val loader = compactor.invokePrivate(loaderMethod()) + assert(loader.numFilesToLoad === 3) + + // event log files: 3.compact, 4, 5, 6 + // add two more log files (7, 8) to compact 3, 4, 5 as 5.compact + + // just roll out without writing to finalize 6, and write 7 and 8 + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, testEvent, rollFile = true) + writeEventsToRollingWriter(writer, testEvent, rollFile = false) + + writer.stop() + + val reader2 = EventLogFileReader(fs, new Path(logPath)).get + assertCompaction(fs, reader2.listEventLogFiles, compactor.compact(reader2), + expectedNumOfFilesCompacted = 3) + + val loader2 = compactor.invokePrivate(loaderMethod()) + // 3 + 2 (no need to re-read compacted file as the state would be same after compaction) + assert(loader2.numFilesToLoad === 5) + } + } + + test("request compaction with lower index than already requested") { + val loaderMethod = PrivateMethod[EventFilterBuildersLoader](Symbol("filterBuildersLoader")) + + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 6).map(_ => testEvent): _*) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 3) + + val loader = compactor.invokePrivate(loaderMethod()) + assert(loader.numFilesToLoad === 3) + + // remove directory and reconstruct files with one less file which makes compactor to target + // less index to compact; note that this shouldn't happen in happy situation, but assuming + // the bad cases here. + assert(fs.delete(new Path(logPath), true)) + val logPath2 = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 5).map(_ => testEvent): _*) + assert(logPath === logPath2) + + val reader2 = EventLogFileReader(fs, new Path(logPath2)).get + // compactor will reload the log files and compact the necessary files + assertCompaction(fs, reader2.listEventLogFiles, compactor.compact(reader2), + expectedNumOfFilesCompacted = 2) + + val loader2 = compactor.invokePrivate(loaderMethod()) + assert(loader ne loader2) + assert(loader2.numFilesToLoad === 2) + } + } + + test("request compaction multiple times with same event log files") { + val loaderMethod = PrivateMethod[EventFilterBuildersLoader](Symbol("filterBuildersLoader")) + + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 6).map(_ => testEvent): _*) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 3) + + val loader = compactor.invokePrivate(loaderMethod()) + assert(loader.numFilesToLoad === 3) + + // remove directory and reconstruct files which effectively reverts the previous compaction; + // note that this shouldn't happen in happy situation, but assuming the bad cases here. + assert(fs.delete(new Path(logPath), true)) + val logPath2 = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 6).map(_ => testEvent): _*) + assert(logPath === logPath2) + + val reader2 = EventLogFileReader(fs, new Path(logPath2)).get + // compactor will reload the log files and compact the necessary files + assertCompaction(fs, reader2.listEventLogFiles, compactor.compact(reader2), + expectedNumOfFilesCompacted = 3) + + val loader2 = compactor.invokePrivate(loaderMethod()) + assert(loader eq loader2) + // no new file should be read + assert(loader2.numFilesToLoad === 3) + } + } + + test("request compaction with different log paths") { + withTempDir { dir => + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = writeEventsToRollingWriter(fs, "app", dir, sparkConf, hadoopConf, + (1 to 6).map(_ => testEvent): _*) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val compactor = new EventLogFileCompactor(sparkConf, hadoopConf, fs) + assertCompaction(fs, reader.listEventLogFiles, compactor.compact(reader), + expectedNumOfFilesCompacted = 3) + + val logPath2 = writeEventsToRollingWriter(fs, "app2", dir, sparkConf, hadoopConf, + (1 to 6).map(_ => testEvent): _*) + + val reader2 = EventLogFileReader(fs, new Path(logPath2)).get + intercept[IllegalArgumentException] { + compactor.compact(reader2) + } + } + } + + private def assertCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedNumOfFilesCompacted: Int): Unit = { + assert(CompactionResult.SUCCESS === compactRet._1) + + val expectRetainedFiles = originalFiles.drop(expectedNumOfFilesCompacted) + expectRetainedFiles.foreach { status => assert(fs.exists(status.getPath)) } + + val expectRemovedFiles = originalFiles.take(expectedNumOfFilesCompacted) + expectRemovedFiles.foreach { status => assert(!fs.exists(status.getPath)) } + + val expectCompactFileBasePath = originalFiles.take(expectedNumOfFilesCompacted).last.getPath + val expectCompactFileIndex = RollingEventLogFilesWriter.getEventLogFileIndex( + expectCompactFileBasePath.getName) + assert(Some(expectCompactFileIndex) === compactRet._2) + + val expectCompactFilePath = getCompactFilePath(expectCompactFileBasePath) + assert(fs.exists(expectCompactFilePath)) + } + + private def getCompactFilePath(expectCompactFileBasePath: Path): Path = { + new Path(expectCompactFileBasePath.getParent, + expectCompactFileBasePath.getName + EventLogFileWriter.COMPACTED) + } + + private def assertNoCompaction( + fs: FileSystem, + originalFiles: Seq[FileStatus], + compactRet: (CompactionResult.Value, Option[Long]), + expectedCompactRet: CompactionResult.Value): Unit = { + assert(compactRet._1 === expectedCompactRet) + assert(None === compactRet._2) + originalFiles.foreach { status => assert(fs.exists(status.getPath)) } + } + + private def testEvent: Seq[SparkListenerEvent] = + Seq(SparkListenerApplicationStart("app", Some("app"), 0, "user", None)) + + private def testSparkConf(): SparkConf = { + new SparkConf() + .set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 3) + // to simplify the tests, we set the score threshold as 0.0d + // individual test can override the value to verify the functionality + .set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index a2ce4acdaaf37..8eab2da1a37b7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -288,13 +288,15 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { assert(status.isDirectory) val statusInDir = fileSystem.listStatus(logPath) - val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => getIndex(s.getPath.getName) } + val eventFiles = statusInDir.filter(isEventLogFile).sortBy { s => + getEventLogFileIndex(s.getPath.getName) + } assert(eventFiles.nonEmpty) val lastEventFile = eventFiles.last val allLen = eventFiles.map(_.getLen).sum assert(reader.rootPath === fileSystem.makeQualified(logPath)) - assert(reader.lastIndex === Some(getIndex(lastEventFile.getPath.getName))) + assert(reader.lastIndex === Some(getEventLogFileIndex(lastEventFile.getPath.getName))) assert(reader.fileSizeForLastIndex === lastEventFile.getLen) assert(reader.completed === isCompleted) assert(reader.modificationTime === lastEventFile.getModificationTime) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala index c4b40884eebf5..060b878fb8ef2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala @@ -291,7 +291,7 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { expectedMaxSizeBytes: Long): Unit = { assert(eventLogFiles.forall(f => f.getLen <= expectedMaxSizeBytes)) assert((1 to expectedLastIndex) === - eventLogFiles.map(f => getIndex(f.getPath.getName))) + eventLogFiles.map(f => getEventLogFileIndex(f.getPath.getName))) } val appId = getUniqueApplicationId @@ -373,6 +373,6 @@ class RollingEventLogFilesWriterSuite extends EventLogFileWritersSuite { private def listEventLogFiles(logDirPath: Path): Seq[FileStatus] = { fileSystem.listStatus(logDirPath).filter(isEventLogFile) - .sortBy { fs => getIndex(fs.getPath.getName) } + .sortBy { fs => getEventLogFileIndex(fs.getPath.getName) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index 55eddce3968c2..f5de18babca8d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala @@ -17,12 +17,17 @@ package org.apache.spark.deploy.history +import java.io.File import java.nio.charset.StandardCharsets -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkConf import org.apache.spark.internal.config._ +import org.apache.spark.scheduler._ +import org.apache.spark.util.JsonProtocol object EventLogTestHelper { def getUniqueApplicationId: String = "test-" + System.currentTimeMillis @@ -56,4 +61,76 @@ object EventLogTestHelper { eventStr } } + + def writeEventLogFile( + sparkConf: SparkConf, + hadoopConf: Configuration, + dir: File, + idx: Int, + events: Seq[SparkListenerEvent]): String = { + // to simplify the code, we don't concern about file name being matched with the naming rule + // of event log file + val writer = new SingleEventLogFileWriter(s"app$idx", None, dir.toURI, sparkConf, hadoopConf) + writer.start() + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + writer.stop() + writer.logPath + } + + def writeEventsToRollingWriter( + fs: FileSystem, + appId: String, + dir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + eventsFiles: Seq[SparkListenerEvent]*): String = { + val writer = new RollingEventLogFilesWriter(appId, None, dir.toURI, sparkConf, hadoopConf) + writer.start() + + eventsFiles.dropRight(1).foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = true) + } + eventsFiles.lastOption.foreach { events => + writeEventsToRollingWriter(writer, events, rollFile = false) + } + + writer.stop() + + writer.logPath + } + + def writeEventsToRollingWriter( + writer: RollingEventLogFilesWriter, + events: Seq[SparkListenerEvent], + rollFile: Boolean): Unit = { + events.foreach { event => writer.writeEvent(convertEvent(event), flushLogger = true) } + if (rollFile) writer.rollEventLogFile() + } + + def convertEvent(event: SparkListenerEvent): String = { + compact(render(JsonProtocol.sparkEventToJson(event))) + } + + class TestEventFilter1 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerBlockManagerAdded => true + case _: SparkListenerApplicationStart => false + } + + override def statistic(): Option[EventFilter.FilterStatistic] = None + } + + class TestEventFilter2 extends EventFilter { + override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = { + case _: SparkListenerApplicationEnd => true + case _: SparkListenerEnvironmentUpdate => true + case _: SparkListenerNodeBlacklisted => true + case _: SparkListenerBlockManagerAdded => false + case _: SparkListenerApplicationStart => false + case _: SparkListenerNodeUnblacklisted => false + } + + override def statistic(): Option[EventFilter.FilterStatistic] = None + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala new file mode 100644 index 0000000000000..96883a7647b4e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/FilteredEventLogFileRewriterSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.mutable +import scala.io.{Codec, Source} + +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.history.EventLogTestHelper.{TestEventFilter1, TestEventFilter2} +import org.apache.spark.scheduler._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils + +class FilteredEventLogFileRewriterSuite extends SparkFunSuite { + test("rewrite files with test filters") { + def writeEventToWriter(writer: EventLogFileWriter, event: SparkListenerEvent): String = { + val line = EventLogTestHelper.convertEvent(event) + writer.writeEvent(line, flushLogger = true) + line + } + + withTempDir { tempDir => + val sparkConf = new SparkConf + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val fs = new Path(tempDir.getAbsolutePath).getFileSystem(hadoopConf) + + val writer = new SingleEventLogFileWriter("app", None, tempDir.toURI, sparkConf, hadoopConf) + writer.start() + + val expectedLines = new mutable.ArrayBuffer[String] + + // filterApplicationEnd: Some(true) & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerApplicationEnd(0)) + + // filterBlockManagerAdded: Some(true) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerBlockManagerAdded(0, BlockManagerId("1", "host1", 1), + 10)) + + // filterApplicationStart: Some(false) & Some(false) => filter out + writeEventToWriter(writer, SparkListenerApplicationStart("app", None, 0, "user", None)) + + // filterNodeBlacklisted: None & Some(true) => filter in + expectedLines += writeEventToWriter(writer, SparkListenerNodeBlacklisted(0, "host1", 1)) + + // filterNodeUnblacklisted: None & Some(false) => filter out + writeEventToWriter(writer, SparkListenerNodeUnblacklisted(0, "host1")) + + // other events: None & None => filter in + expectedLines += writeEventToWriter(writer, SparkListenerUnpersistRDD(0)) + + writer.stop() + + val filters = Seq(new TestEventFilter1, new TestEventFilter2) + + val rewriter = new FilteredEventLogFileRewriter(sparkConf, hadoopConf, fs, filters) + val logPath = new Path(writer.logPath) + val newPath = rewriter.rewrite(Seq(fs.getFileStatus(logPath))) + assert(new Path(newPath).getName === logPath.getName + EventLogFileWriter.COMPACTED) + + Utils.tryWithResource(EventLogFileReader.openEventLog(new Path(newPath), fs)) { is => + val lines = Source.fromInputStream(is)(Codec.UTF8).getLines() + var linesLength = 0 + lines.foreach { line => + linesLength += 1 + assert(expectedLines.contains(line)) + } + assert(linesLength === expectedLines.length, "The number of lines for rewritten file " + + s"is not expected: expected ${expectedLines.length} / actual $linesLength") + } + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a96667ffacd26..af6daa9a50c98 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration._ import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path} import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem} import org.apache.hadoop.security.AccessControlException @@ -37,9 +38,10 @@ import org.mockito.Mockito.{doThrow, mock, spy, verify, when} import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite} +import org.apache.spark.{JobExecutionStatus, SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR +import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN} import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.UI.{ADMIN_ACLS, ADMIN_ACLS_GROUPS, USER_GROUPS_MAPPING} import org.apache.spark.io._ @@ -50,9 +52,11 @@ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.KVUtils.KVStoreScalaSerializer import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} +import org.apache.spark.util.kvstore.InMemoryStore import org.apache.spark.util.logging.DriverLogger class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { + import EventLogTestHelper._ private var testDir: File = null @@ -164,8 +168,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { override private[history] def doMergeApplicationListing( reader: EventLogFileReader, lastSeen: Long, - enableSkipToEnd: Boolean): Unit = { - super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd) + enableSkipToEnd: Boolean, + lastCompactionIndex: Option[Long]): Unit = { + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd, lastCompactionIndex) doMergeApplicationListingCall += 1 } } @@ -1167,7 +1172,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { var fileStatus = new FileStatus(200, false, 0, 0, 0, path) when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) var logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) var reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) @@ -1177,14 +1182,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { when(mockedFs.getFileStatus(path)).thenReturn(fileStatus) // DFSInputStream.getFileLength is more than logInfo fileSize logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(mockedProvider.shouldReloadLog(logInfo, reader.get)) // DFSInputStream.getFileLength is equal to logInfo fileSize logInfo = new LogInfo(path.toString, 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 200, None, false) + Some("attemptId"), 200, None, None, false) reader = EventLogFileReader(mockedFs, path) assert(reader.isDefined) assert(!mockedProvider.shouldReloadLog(logInfo, reader.get)) @@ -1292,11 +1297,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val serializer = new KVStoreScalaSerializer() val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, None, false) + Some("attemptId"), 100, None, None, false) assertSerDe(serializer, logInfoWithIndexAsNone) val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), - Some("attemptId"), 100, Some(3), false) + Some("attemptId"), 100, Some(3), None, false) assertSerDe(serializer, logInfoWithIndex) } @@ -1362,6 +1367,102 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files when replaying to rebuild app") { + withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val logPath = constructEventLogsForCompactionTest(fs, "app", dir, conf, hadoopConf) + + val provider = new FsHistoryProvider(conf) + updateAndCheck(provider) { listing => + assert(listing.map(_.id).toSet === Set("app")) + + val reader = EventLogFileReader(fs, new Path(logPath)).get + val logFiles = reader.listEventLogFiles + // compacted file + retained file + assert(logFiles.size === 2) + assert(RollingEventLogFilesWriter.isEventLogFile(logFiles.head)) + assert(EventLogFileWriter.isCompacted(logFiles.head.getPath)) + assert(2 == RollingEventLogFilesWriter.getEventLogFileIndex(logFiles.head.getPath.getName)) + + assert(RollingEventLogFilesWriter.isEventLogFile(logFiles(1))) + assert(!EventLogFileWriter.isCompacted(logFiles(1).getPath)) + assert(3 == RollingEventLogFilesWriter.getEventLogFileIndex(logFiles(1).getPath.getName)) + + val store = new InMemoryStore + val appStore = new AppStatusStore(store) + + provider.rebuildAppStore(store, reader, 0L) + + // replayed store doesn't have any job, as events for job are removed while compacting + intercept[NoSuchElementException] { + appStore.job(1) + } + + // but other events should be available even they were in original files to compact + val appInfo = appStore.applicationInfo() + assert(appInfo.id === "app") + assert(appInfo.name === "app") + + // all events in retained file should be available, even they're related to finished jobs + val exec1 = appStore.executorSummary("exec1") + assert(exec1.hostPort === "host1") + val job2 = appStore.job(2) + assert(job2.status === JobExecutionStatus.SUCCEEDED) + } + } + } + + test("The compactor instance will be reserved per app log") { + withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val app1LogPath = constructEventLogsForCompactionTest(fs, "app1", dir, conf, hadoopConf) + val app2LogPath = constructEventLogsForCompactionTest(fs, "app2", dir, conf, hadoopConf) + val app3LogPath = constructEventLogsForCompactionTest(fs, "app3", dir, conf, hadoopConf) + + val provider = new FsHistoryProvider(conf) + updateAndCheck(provider) { listing => + assert(listing.map(_.id).toSet === Set("app1", "app2", "app3")) + + val compactorForApp1 = provider.logToCompactor(app1LogPath) + val compactorForApp2 = provider.logToCompactor(app2LogPath) + val compactorForApp3 = provider.logToCompactor(app3LogPath) + assert(compactorForApp1 ne compactorForApp2) + assert(compactorForApp2 ne compactorForApp3) + assert(compactorForApp1 ne compactorForApp3) + } + } + } + + private def constructEventLogsForCompactionTest( + fs: FileSystem, + appId: String, + dir: File, + conf: SparkConf, + hadoopConf: Configuration): String = { + writeEventsToRollingWriter(fs, appId, dir, conf, hadoopConf, + // 1, 2 will be compacted into one file, 3 is the dummy file to ensure max files to retain + Seq( + SparkListenerApplicationStart(appId, Some(appId), 0, "user", None), + SparkListenerJobStart(1, 0, Seq.empty)), + Seq(SparkListenerUnpersistRDD(1), SparkListenerJobEnd(1, 1, JobSucceeded)), + Seq( + SparkListenerExecutorAdded(3, "exec1", new ExecutorInfo("host1", 1, Map.empty)), + SparkListenerJobStart(2, 4, Seq.empty), + SparkListenerJobEnd(2, 5, JobSucceeded))) + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index a289dddbdc9e6..10ff95ad935cb 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.storage._ import org.apache.spark.util.Utils class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { + import ListenerEventsTestHelper._ private val conf = new SparkConf() .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) @@ -1694,40 +1695,4 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { def blockId: BlockId = RDDBlockId(rddId, partId) } - - /** Create a stage submitted event for the specified stage Id. */ - private def createStageSubmittedEvent(stageId: Int) = { - SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create a stage completed event for the specified stage Id. */ - private def createStageCompletedEvent(stageId: Int) = { - SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, - Seq.empty, Seq.empty, "details")) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorAddedEvent(executorId: Int) = { - SparkListenerExecutorAdded(0L, executorId.toString, - new ExecutorInfo("host1", 1, Map.empty, Map.empty)) - } - - /** Create an executor added event for the specified executor Id. */ - private def createExecutorRemovedEvent(executorId: Int) = { - SparkListenerExecutorRemoved(10L, executorId.toString, "test") - } - - /** Create an executor metrics update event, with the specified executor metrics values. */ - private def createExecutorMetricsUpdateEvent( - stageId: Int, - executorId: Int, - executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { - val taskMetrics = TaskMetrics.empty - taskMetrics.incDiskBytesSpilled(111) - taskMetrics.incMemoryBytesSpilled(222) - val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) - val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) - SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) - } } diff --git a/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala new file mode 100644 index 0000000000000..37a35744ada5e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/status/ListenerEventsTestHelper.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.Properties + +import scala.collection.immutable.Map + +import org.apache.spark.{AccumulatorSuite, SparkContext, Success, TaskState} +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality} +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.storage.{RDDInfo, StorageLevel} + +object ListenerEventsTestHelper { + + private var taskIdTracker = -1L + private var rddIdTracker = -1 + private var stageIdTracker = -1 + + def reset(): Unit = { + taskIdTracker = -1L + rddIdTracker = -1 + stageIdTracker = -1 + } + + def createJobProps(): Properties = { + val jobProps = new Properties() + jobProps.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, "jobDescription") + jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") + jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool") + jobProps + } + + def createRddsWithId(ids: Seq[Int]): Seq[RDDInfo] = { + ids.map { rddId => + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createRdds(count: Int): Seq[RDDInfo] = { + (1 to count).map { _ => + val rddId = nextRddId() + new RDDInfo(rddId, s"rdd${rddId}", 2, StorageLevel.NONE, false, Nil) + } + } + + def createStage(id: Int, rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + new StageInfo(id, 0, s"stage${id}", 4, rdds, parentIds, s"details${id}") + } + + def createStage(rdds: Seq[RDDInfo], parentIds: Seq[Int]): StageInfo = { + createStage(nextStageId(), rdds, parentIds) + } + + def createTasks(ids: Seq[Long], execs: Array[String], time: Long): Seq[TaskInfo] = { + ids.zipWithIndex.map { case (id, idx) => + val exec = execs(idx % execs.length) + new TaskInfo(id, idx, 1, time, exec, s"$exec.example.com", + TaskLocality.PROCESS_LOCAL, idx % 2 == 0) + } + } + + def createTasks(count: Int, execs: Array[String], time: Long): Seq[TaskInfo] = { + createTasks((1 to count).map { _ => nextTaskId() }, execs, time) + } + + def createTaskWithNewAttempt(orig: TaskInfo, time: Long): TaskInfo = { + // Task reattempts have a different ID, but the same index as the original. + new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId, + s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative) + } + + def createTaskStartEvent( + taskInfo: TaskInfo, + stageId: Int, + attemptId: Int): SparkListenerTaskStart = { + SparkListenerTaskStart(stageId, attemptId, taskInfo) + } + + /** Create a stage submitted event for the specified stage Id. */ + def createStageSubmittedEvent(stageId: Int): SparkListenerStageSubmitted = { + SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create a stage completed event for the specified stage Id. */ + def createStageCompletedEvent(stageId: Int): SparkListenerStageCompleted = { + SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + def createExecutorAddedEvent(executorId: Int): SparkListenerExecutorAdded = { + createExecutorAddedEvent(executorId.toString, 0) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorAddedEvent(executorId: String, time: Long): SparkListenerExecutorAdded = { + SparkListenerExecutorAdded(time, executorId, + new ExecutorInfo("host1", 1, Map.empty, Map.empty)) + } + + def createExecutorRemovedEvent(executorId: Int): SparkListenerExecutorRemoved = { + createExecutorRemovedEvent(executorId.toString, 10L) + } + + /** Create an executor added event for the specified executor Id. */ + def createExecutorRemovedEvent(executorId: String, time: Long): SparkListenerExecutorRemoved = { + SparkListenerExecutorRemoved(time, executorId, "test") + } + + /** Create an executor metrics update event, with the specified executor metrics values. */ + def createExecutorMetricsUpdateEvent( + stageId: Int, + executorId: Int, + executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = { + val taskMetrics = TaskMetrics.empty + taskMetrics.incDiskBytesSpilled(111) + taskMetrics.incMemoryBytesSpilled(222) + val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) + val executorUpdates = Map((stageId, 0) -> new ExecutorMetrics(executorMetrics)) + SparkListenerExecutorMetricsUpdate(executorId.toString, accum, executorUpdates) + } + + case class JobInfo( + stageIds: Seq[Int], + stageToTaskIds: Map[Int, Seq[Long]], + stageToRddIds: Map[Int, Seq[Int]]) + + def pushJobEventsWithoutJobEnd( + listener: SparkListener, + jobId: Int, + jobProps: Properties, + execIds: Array[String], + time: Long): JobInfo = { + // Start a job with 1 stages / 4 tasks each + val rddsForStage = createRdds(2) + val stage = createStage(rddsForStage, Nil) + + listener.onJobStart(SparkListenerJobStart(jobId, time, Seq(stage), jobProps)) + + // Submit stage + stage.submissionTime = Some(time) + listener.onStageSubmitted(SparkListenerStageSubmitted(stage, jobProps)) + + // Start tasks from stage + val s1Tasks = createTasks(4, execIds, time) + s1Tasks.foreach { task => + listener.onTaskStart(SparkListenerTaskStart(stage.stageId, + stage.attemptNumber(), task)) + } + + // Succeed all tasks in stage. + val s1Metrics = TaskMetrics.empty + s1Metrics.setExecutorCpuTime(2L) + s1Metrics.setExecutorRunTime(4L) + + s1Tasks.foreach { task => + task.markFinished(TaskState.FINISHED, time) + listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, + "taskType", Success, task, new ExecutorMetrics, s1Metrics)) + } + + // End stage. + stage.completionTime = Some(time) + listener.onStageCompleted(SparkListenerStageCompleted(stage)) + + JobInfo(Seq(stage.stageId), Map(stage.stageId -> s1Tasks.map(_.taskId)), + Map(stage.stageId -> rddsForStage.map(_.id))) + } + + private def nextTaskId(): Long = { + taskIdTracker += 1 + taskIdTracker + } + + private def nextRddId(): Int = { + rddIdTracker += 1 + rddIdTracker + } + + private def nextStageId(): Int = { + stageIdTracker += 1 + stageIdTracker + } +} diff --git a/docs/configuration.md b/docs/configuration.md index 497a2ad36b67c..f16033c288088 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1023,6 +1023,24 @@ Apart from these, the following properties are also available, and may be useful The max size of event log file before it's rolled over. +
spark.eventLog.rolling.maxFilesToRetainspark.eventLog.rolling.maxFileSize accordingly if you want to control
+ the overall size of event log files. The event log files older than these retained
+ files will be compacted into single file and deleted afterwards.spark.ui.dagGraph.retainedRootRDDs