Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.EventFilter.FilterStatistics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EVENT_LOG_COMPACTION_SCORE_THRESHOLD, EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN}
import org.apache.spark.scheduler.ReplayListenerBus
import org.apache.spark.util.Utils

Expand All @@ -49,9 +48,11 @@ import org.apache.spark.util.Utils
class EventLogFileCompactor(
sparkConf: SparkConf,
hadoopConf: Configuration,
fs: FileSystem) extends Logging {
private val maxFilesToRetain: Int = sparkConf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN)
private val compactionThresholdScore: Double = sparkConf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD)
fs: FileSystem,
maxFilesToRetain: Int,
compactionThresholdScore: Double) extends Logging {

require(maxFilesToRetain > 0, "Max event log files to retain should be higher than 0.")

/**
* Compacts the old event log files into one compact file, and clean old event log files being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
new HistoryServerDiskManager(conf, path, listing, clock)
}

private val fileCompactor = new EventLogFileCompactor(conf, hadoopConf, fs,
conf.get(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN), conf.get(EVENT_LOG_COMPACTION_SCORE_THRESHOLD))

// Used to store the paths, which are being processed. This enable the replay log tasks execute
// asynchronously and make sure that checkForLogs would not process a path repeatedly.
private val processing = ConcurrentHashMap.newKeySet[String]
Expand Down Expand Up @@ -475,10 +478,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

if (shouldReloadLog(info, reader)) {
// ignore fastInProgressParsing when the status of application is changed from
// in-progress to completed, which is needed for rolling event log.
if (info.appId.isDefined && (info.isComplete == reader.completed) &&
fastInProgressParsing) {
// ignore fastInProgressParsing when rolling event log is enabled on the log path,
// to ensure proceeding compaction even fastInProgressParsing is turned on.
if (info.appId.isDefined && reader.lastIndex.isEmpty && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
// Also, we need to update the `lastUpdated time` to display the updated time in
Expand Down Expand Up @@ -518,7 +520,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// to parse it. This will allow the cleaner code to detect the file as stale later on
// if it was not possible to parse it.
listing.write(LogInfo(reader.rootPath.toString(), newLastScanTime, LogType.EventLogs,
None, None, reader.fileSizeForLastIndex, reader.lastIndex,
None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
reader.completed))
reader.fileSizeForLastIndex > 0
}
Expand All @@ -532,16 +534,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

updated.foreach { entry =>
processing(entry.rootPath)
try {
val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true)
replayExecutor.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting event log for replay", e)
submitLogProcessTask(entry.rootPath) { () =>
mergeApplicationListing(entry, newLastScanTime, true)
}
}

Expand Down Expand Up @@ -661,27 +655,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
val rootPath = reader.rootPath
try {
val lastEvaluatedForCompaction: Option[Long] = try {
listing.read(classOf[LogInfo], rootPath.toString).lastEvaluatedForCompaction
} catch {
case _: NoSuchElementException => None
}

pendingReplayTasksCount.incrementAndGet()
doMergeApplicationListing(reader, scanTime, enableOptimizations)
doMergeApplicationListing(reader, scanTime, enableOptimizations, lastEvaluatedForCompaction)
if (conf.get(CLEANER_ENABLED)) {
checkAndCleanLog(reader.rootPath.toString)
checkAndCleanLog(rootPath.toString)
}
} catch {
case e: InterruptedException =>
throw e
case e: AccessControlException =>
// We don't have read permissions on the log file
logWarning(s"Unable to read log ${reader.rootPath}", e)
blacklist(reader.rootPath)
logWarning(s"Unable to read log $rootPath", e)
blacklist(rootPath)
// SPARK-28157 We should remove this blacklisted entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], reader.rootPath.toString)
listing.delete(classOf[LogInfo], rootPath.toString)
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
endProcessing(reader.rootPath)
endProcessing(rootPath)
pendingReplayTasksCount.decrementAndGet()

// triggering another task for compaction task
submitLogProcessTask(rootPath) { () => compact(reader) }
}
}

Expand All @@ -692,7 +696,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private[history] def doMergeApplicationListing(
reader: EventLogFileReader,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
enableOptimizations: Boolean,
lastEvaluatedForCompaction: Option[Long]): Unit = {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
Expand Down Expand Up @@ -770,8 +775,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
invalidateUI(app.info.id, app.attempts.head.info.attemptId)
addListing(app)
listing.write(LogInfo(logPath.toString(), scanTime, LogType.EventLogs, Some(app.info.id),
app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
reader.lastIndex, reader.completed))
app.attempts.head.info.attemptId, reader.fileSizeForLastIndex, reader.lastIndex,
lastEvaluatedForCompaction, reader.completed))

// For a finished log, remove the corresponding "in progress" entry from the listing DB if
// the file is really gone.
Expand All @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
doMergeApplicationListing(reader, scanTime, enableOptimizations = false)
doMergeApplicationListing(reader, scanTime, enableOptimizations = false,
lastEvaluatedForCompaction)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
listing.write(
LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
reader.fileSizeForLastIndex, reader.lastIndex, reader.completed))
reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction,
reader.completed))
}
}

private def compact(reader: EventLogFileReader): Unit = {
val rootPath = reader.rootPath
try {
reader.lastIndex match {
case Some(lastIndex) =>
try {
val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
if (info.lastEvaluatedForCompaction.isEmpty ||
info.lastEvaluatedForCompaction.get < lastIndex) {
// haven't tried compaction for this index, do compaction
fileCompactor.compact(reader.listEventLogFiles)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one thing that feels a tiny bit odd is that when deciding whether to compact, you're actually considering the last log file, which you won't consider during actual compaction, right?

Wouldn't that cause unnecessary (or too aggressive) compaction at the end of the application, when potentially a bunch of jobs finish and "release" lots of tasks, inflating the compation scoe?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jan 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So one thing that feels a tiny bit odd is that when deciding whether to compact, you're actually considering the last log file, which you won't consider during actual compaction, right?
Wouldn't that cause unnecessary (or too aggressive) compaction at the end of the application, when potentially a bunch of jobs finish and "release" lots of tasks, inflating the compation scoe?

That's the intention that callers of compactor don't care about how many files are actually affected. Callers of compactor just need to know that same list of log files would bring same result, unless it fails and throws exception. How many files are excluded in compaction is just a configuration, and the last log file should be excluded is an implementation detail. (We prevent it in both configuration and compactor via having 1 as min value for max retain log file.)

Compactor will ignore the last log file in any way as configured, so unless the rare case where the log is rolled just before the app is finished, it won't happen. And most probably end users would avoid to set the value to 1 if they read the doc and understand how it works.

listing.write(info.copy(lastEvaluatedForCompaction = Some(lastIndex)))
}
} catch {
case _: NoSuchElementException =>
// this should exist, but ignoring doesn't hurt much
}

case None => // This is not applied to single event log file.
}
} finally {
endProcessing(rootPath)
}
}

Expand Down Expand Up @@ -962,7 +994,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: NoSuchElementException =>
// For every new driver log file discovered, create a new entry in listing
listing.write(LogInfo(f.getPath().toString(), currentTime, LogType.DriverLogs, None,
None, f.getLen(), None, false))
None, f.getLen(), None, None, false))
false
}
if (deleteFile) {
Expand All @@ -989,9 +1021,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

/**
* Rebuilds the application state store from its event log.
* Rebuilds the application state store from its event log. Exposed for testing.
*/
private def rebuildAppStore(
private[spark] def rebuildAppStore(
store: KVStore,
reader: EventLogFileReader,
lastUpdated: Long): Unit = {
Expand All @@ -1010,8 +1042,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} replayBus.addListener(listener)

try {
val eventLogFiles = reader.listEventLogFiles
logInfo(s"Parsing ${reader.rootPath} to re-build UI...")
parseAppEventLogs(reader.listEventLogFiles, replayBus, !reader.completed)
parseAppEventLogs(eventLogFiles, replayBus, !reader.completed)
trackingStore.close(false)
logInfo(s"Finished parsing ${reader.rootPath}")
} catch {
Expand Down Expand Up @@ -1122,30 +1155,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.

val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
val newStorePath = try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
var retried = false
var newStorePath: File = null
while (newStorePath == null) {
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
val isCompressed = reader.compressionCodec.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding app $appId - trying again...")
lease.rollback()
retried = true

case e: Exception =>
lease.rollback()
throw e
}
lease.commit(appId, attempt.info.attemptId)
} catch {
case e: Exception =>
lease.rollback()
throw e
}

KVUtils.open(newStorePath, metadata)
}

private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
var retried = false
var store: KVStore = null
while (store == null) {
try {
val s = new InMemoryStore()
val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
attempt.lastIndex)
rebuildAppStore(s, reader, attempt.info.lastUpdated.getTime())
store = s
} catch {
case _: IOException if !retried =>
// compaction may touch the file(s) which app rebuild wants to read
// compaction wouldn't run in short interval, so try again...
logWarning(s"Exception occurred while rebuilding log path ${attempt.logPath} - " +
"trying again...")
retried = true

case e: Exception =>
throw e
}
}

store
}

Expand Down Expand Up @@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
deleted
}

/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps this method could take care of calling endProcessing too (e.g. by wrapping the task)? Should be just a small adjustment at the call site.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jan 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finally statement in mergeApplicationListing makes it be complicated, because we should handle the reentrance of lock. (We're submitting another task in the task.)

If we move endProcessing from finally statement in mergeApplicationListing to the end of task here, processing on the compaction task would be executed earlier than calling endProcessing on the listing task. Marking lock from compaction task would succeed but effectively no-op, and releasing lock from listing task would remove the mark for compaction task as well, which makes the compaction task run without proper lock.

So either we need to make lock much smarter, or document the requirement on caller side. I'm feeling that former one is more complicated than latter one.

private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
try {
processing(rootPath)
replayExecutor.submit(task)
} catch {
// let the iteration over the updated entries break, since an exception on
// replayExecutor.submit (..) indicates the ExecutorService is unable
// to take any more submissions at this time
case e: Exception =>
logError(s"Exception while submitting task", e)
endProcessing(rootPath)
}
}
}

private[history] object FsHistoryProvider {
Expand Down Expand Up @@ -1218,6 +1295,8 @@ private[history] case class LogInfo(
fileSize: Long,
@JsonDeserialize(contentAs = classOf[JLong])
lastIndex: Option[Long],
@JsonDeserialize(contentAs = classOf[JLong])
lastEvaluatedForCompaction: Option[Long],
isComplete: Boolean)

private[history] class AttemptInfoWrapper(
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ private[spark] object History {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("1m")

private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN =
ConfigBuilder("spark.history.fs.eventLog.rolling.maxFilesToRetain")
.doc("The maximum number of event log files which will be retained as non-compacted. " +
"By default, all event log files will be retained. Please set the configuration " +
s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " +
"the overall size of event log files.")
.intConf
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.")
.createWithDefault(Integer.MAX_VALUE)

private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD =
ConfigBuilder("spark.history.fs.eventLog.rolling.compaction.score.threshold")
.internal()
.doubleConf
.createWithDefault(0.7d)

val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled")
.fallbackConf(CLEANER_ENABLED)

Expand Down
18 changes: 0 additions & 18 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,6 @@ package object config {
"configured to be at least 10 MiB.")
.createWithDefaultString("128m")

private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN =
ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain")
// TODO: remove this when integrating compactor with FsHistoryProvider
.internal()
.doc("The maximum number of event log files which will be retained as non-compacted. " +
"By default, all event log files will be retained. Please set the configuration " +
s"and ${EVENT_LOG_ROLLING_MAX_FILE_SIZE.key} accordingly if you want to control " +
"the overall size of event log files.")
.intConf
.checkValue(_ > 0, "Max event log files to retain should be higher than 0.")
.createWithDefault(Integer.MAX_VALUE)

private[spark] val EVENT_LOG_COMPACTION_SCORE_THRESHOLD =
ConfigBuilder("spark.eventLog.rolling.compaction.score.threshold")
.internal()
.doubleConf
.createWithDefault(0.7d)

private[spark] val EXECUTOR_ID =
ConfigBuilder("spark.executor.id").stringConf.createOptional

Expand Down
Loading