diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a299b79850613..94c80ebd55e74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.util.Try import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore @@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.status._ import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} +import org.apache.spark.status.config._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.kvstore._ @@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } }.getOrElse(new InMemoryStore()) + private val diskManager = storePath.map { path => + new HistoryServerDiskManager(conf, path, listing, clock) + } + private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() /** @@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def startPolling(): Unit = { + diskManager.foreach(_.initialize()) + // Validate the log directory. val path = new Path(logDir) try { @@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.adminAclsGroups.getOrElse("")) secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) - val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) } + val kvstore = try { + diskManager match { + case Some(sm) => + loadDiskStore(sm, appId, attempt) - val (kvstore, needReplay) = uiStorePath match { - case Some(path) => - try { - // The store path is not guaranteed to exist - maybe it hasn't been created, or was - // invalidated because changes to the event log were detected. Need to replay in that - // case. - val _replay = !path.isDirectory() - (createDiskStore(path, conf), _replay) - } catch { - case e: Exception => - // Get rid of the old data and re-create it. The store is either old or corrupted. - logWarning(s"Failed to load disk store $uiStorePath for $appId.", e) - Utils.deleteRecursively(path) - (createDiskStore(path, conf), true) - } - - case _ => - (new InMemoryStore(), true) - } - - val plugins = ServiceLoader.load( - classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala - val trackingStore = new ElementTrackingStore(kvstore, conf) - if (needReplay) { - val replayBus = new ReplayListenerBus() - val listener = new AppStatusListener(trackingStore, conf, false, - lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) - replayBus.addListener(listener) - for { - plugin <- plugins - listener <- plugin.createListeners(conf, trackingStore) - } replayBus.addListener(listener) - try { - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - trackingStore.close(false) - } catch { - case e: Exception => - Utils.tryLogNonFatalError { - trackingStore.close() - } - uiStorePath.foreach(Utils.deleteRecursively) - if (e.isInstanceOf[FileNotFoundException]) { - return None - } else { - throw e - } + case _ => + createInMemoryStore(attempt) } + } catch { + case _: FileNotFoundException => + return None } val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId), attempt.info.startTime.getTime(), attempt.info.appSparkVersion) - plugins.foreach(_.setupUI(ui)) + loadPlugins().foreach(_.setupUI(ui)) val loadedUI = LoadedAppUI(ui) @@ -417,11 +387,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) loadedUI.lock.writeLock().unlock() } - // If the UI is not valid, delete its files from disk, if any. This relies on the fact that - // ApplicationCache will never call this method concurrently with getAppUI() for the same - // appId / attemptId. - if (!loadedUI.valid && storePath.isDefined) { - Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId)) + diskManager.foreach { dm => + // If the UI is not valid, delete its files from disk, if any. This relies on the fact that + // ApplicationCache will never call this method concurrently with getAppUI() for the same + // appId / attemptId. + dm.release(appId, attemptId, delete = !loadedUI.valid) } } } @@ -569,12 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } val logPath = fileStatus.getPath() - val bus = new ReplayListenerBus() val listener = new AppListingListener(fileStatus, clock) bus.addListener(listener) + replay(fileStatus, bus, eventsFilter = eventsFilter) - replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter) listener.applicationInfo.foreach { app => // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a // discussion on the UI lifecycle. @@ -651,10 +620,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private def replay( eventLog: FileStatus, - appCompleted: Boolean, bus: ReplayListenerBus, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() + val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS) logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, // and when we read the file here. That is OK -- it may result in an unnecessary refresh @@ -664,18 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) - logInfo(s"Finished replaying $logPath") + bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter) + logInfo(s"Finished parsing $logPath") } finally { logInput.close() } } /** - * Return true when the application has completed. + * Rebuilds the application state store from its event log. */ - private def isApplicationCompleted(entry: FileStatus): Boolean = { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) + private def rebuildAppStore( + store: KVStore, + eventLog: FileStatus, + lastUpdated: Long): Unit = { + // Disable async updates, since they cause higher memory usage, and it's ok to take longer + // to parse the event logs in the SHS. + val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false) + val trackingStore = new ElementTrackingStore(store, replayConf) + val replayBus = new ReplayListenerBus() + val listener = new AppStatusListener(trackingStore, replayConf, false, + lastUpdateTime = Some(lastUpdated)) + replayBus.addListener(listener) + + for { + plugin <- loadPlugins() + listener <- plugin.createListeners(conf, trackingStore) + } replayBus.addListener(listener) + + try { + replay(eventLog, replayBus) + trackingStore.close(false) + } catch { + case e: Exception => + Utils.tryLogNonFatalError { + trackingStore.close() + } + throw e + } } /** @@ -751,14 +746,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } - private def createDiskStore(path: File, conf: SparkConf): KVStore = { + private def loadDiskStore( + dm: HistoryServerDiskManager, + appId: String, + attempt: AttemptInfoWrapper): KVStore = { val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION) - KVUtils.open(path, metadata) + + // First check if the store already exists and try to open it. If that fails, then get rid of + // the existing data. + dm.openStore(appId, attempt.info.attemptId).foreach { path => + try { + return KVUtils.open(path, metadata) + } catch { + case e: Exception => + logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e) + dm.release(appId, attempt.info.attemptId, delete = true) + } + } + + // At this point the disk data either does not exist or was deleted because it failed to + // load, so the event log needs to be replayed. + val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) + val isCompressed = EventLoggingListener.codecName(status.getPath()).flatMap { name => + Try(CompressionCodec.getShortName(name)).toOption + }.isDefined + logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...") + val lease = dm.lease(status.getLen(), isCompressed) + val newStorePath = try { + val store = KVUtils.open(lease.tmpPath, metadata) + try { + rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) + } finally { + store.close() + } + lease.commit(appId, attempt.info.attemptId) + } catch { + case e: Exception => + lease.rollback() + throw e + } + + KVUtils.open(newStorePath, metadata) + } + + private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = { + val store = new InMemoryStore() + val status = fs.getFileStatus(new Path(logDir, attempt.logPath)) + rebuildAppStore(store, status, attempt.info.lastUpdated.getTime()) + store } - private def getStorePath(path: File, appId: String, attemptId: Option[String]): File = { - val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" - new File(path, fileName) + private def loadPlugins(): Iterable[AppHistoryServerPlugin] = { + ServiceLoader.load(classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala } /** For testing. Returns internal data about a single attempt. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala new file mode 100644 index 0000000000000..c03a360b91ef8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.commons.io.FileUtils + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.status.KVUtils._ +import org.apache.spark.util.{Clock, Utils} +import org.apache.spark.util.kvstore.KVStore + +/** + * A class used to keep track of disk usage by the SHS, allowing application data to be deleted + * from disk when usage exceeds a configurable threshold. + * + * The goal of the class is not to guarantee that usage will never exceed the threshold; because of + * how application data is written, disk usage may temporarily go higher. But, eventually, it + * should fall back under the threshold. + * + * @param conf Spark configuration. + * @param path Path where to store application data. + * @param listing The listing store, used to persist usage data. + * @param clock Clock instance to use. + */ +private class HistoryServerDiskManager( + conf: SparkConf, + path: File, + listing: KVStore, + clock: Clock) extends Logging { + + import config._ + + private val appStoreDir = new File(path, "apps") + if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") + } + + private val tmpStoreDir = new File(path, "temp") + if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create temp directory ($tmpStoreDir).") + } + + private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE) + private val currentUsage = new AtomicLong(0L) + private val committedUsage = new AtomicLong(0L) + private val active = new HashMap[(String, Option[String]), Long]() + + def initialize(): Unit = { + updateUsage(sizeOf(appStoreDir), committed = true) + + // Clean up any temporary stores during start up. This assumes that they're leftover from other + // instances and are not useful. + tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly) + + // Go through the recorded store directories and remove any that may have been removed by + // external code. + val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info => + !new File(info.path).exists() + }.toSeq + + orphans.foreach { info => + listing.delete(info.getClass(), info.path) + } + + logInfo("Initialized disk manager: " + + s"current usage = ${Utils.bytesToString(currentUsage.get())}, " + + s"max usage = ${Utils.bytesToString(maxUsage)}") + } + + /** + * Lease some space from the store. The leased space is calculated as a fraction of the given + * event log size; this is an approximation, and doesn't mean the application store cannot + * outgrow the lease. + * + * If there's not enough space for the lease, other applications might be evicted to make room. + * This method always returns a lease, meaning that it's possible for local disk usage to grow + * past the configured threshold if there aren't enough idle applications to evict. + * + * While the lease is active, the data is written to a temporary location, so `openStore()` + * will still return `None` for the application. + */ + def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = { + val needed = approximateSize(eventLogSize, isCompressed) + makeRoom(needed) + + val perms = PosixFilePermissions.fromString("rwx------") + val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore", + PosixFilePermissions.asFileAttribute(perms)).toFile() + + updateUsage(needed) + val current = currentUsage.get() + if (current > maxUsage) { + logInfo(s"Lease of ${Utils.bytesToString(needed)} may cause usage to exceed max " + + s"(${Utils.bytesToString(current)} > ${Utils.bytesToString(maxUsage)})") + } + + new Lease(tmp, needed) + } + + /** + * Returns the location of an application store if it's still available. Marks the store as + * being used so that it's not evicted when running out of designated space. + */ + def openStore(appId: String, attemptId: Option[String]): Option[File] = { + val storePath = active.synchronized { + val path = appStorePath(appId, attemptId) + if (path.isDirectory()) { + active(appId -> attemptId) = sizeOf(path) + Some(path) + } else { + None + } + } + + storePath.foreach { path => + updateAccessTime(appId, attemptId) + } + + storePath + } + + /** + * Tell the disk manager that the store for the given application is not being used anymore. + * + * @param delete Whether to delete the store from disk. + */ + def release(appId: String, attemptId: Option[String], delete: Boolean = false): Unit = { + // Because LevelDB may modify the structure of the store files even when just reading, update + // the accounting for this application when it's closed. + val oldSizeOpt = active.synchronized { + active.remove(appId -> attemptId) + } + + oldSizeOpt.foreach { oldSize => + val path = appStorePath(appId, attemptId) + updateUsage(-oldSize, committed = true) + if (path.isDirectory()) { + if (delete) { + deleteStore(path) + } else { + val newSize = sizeOf(path) + val newInfo = listing.read(classOf[ApplicationStoreInfo], path.getAbsolutePath()) + .copy(size = newSize) + listing.write(newInfo) + updateUsage(newSize, committed = true) + } + } + } + } + + /** + * A non-scientific approximation of how large an app state store will be given the size of the + * event log. + */ + def approximateSize(eventLogSize: Long, isCompressed: Boolean): Long = { + if (isCompressed) { + // For compressed logs, assume that compression reduces the log size a lot, and the disk + // store will actually grow compared to the log size. + eventLogSize * 2 + } else { + // For non-compressed logs, assume the disk store will end up at approximately 50% of the + // size of the logs. This is loosely based on empirical evidence. + eventLogSize / 2 + } + } + + /** Current free space. Considers space currently leased out too. */ + def free(): Long = { + math.max(maxUsage - currentUsage.get(), 0L) + } + + /** Current committed space. */ + def committed(): Long = committedUsage.get() + + private def deleteStore(path: File): Unit = { + FileUtils.deleteDirectory(path) + listing.delete(classOf[ApplicationStoreInfo], path.getAbsolutePath()) + } + + private def makeRoom(size: Long): Unit = { + if (free() < size) { + logDebug(s"Not enough free space, looking at candidates for deletion...") + val evicted = new ListBuffer[ApplicationStoreInfo]() + Utils.tryWithResource( + listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator() + ) { iter => + var needed = size + while (needed > 0 && iter.hasNext()) { + val info = iter.next() + val isActive = active.synchronized { + active.contains(info.appId -> info.attemptId) + } + if (!isActive) { + evicted += info + needed -= info.size + } + } + } + + if (evicted.nonEmpty) { + val freed = evicted.map { info => + logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.") + deleteStore(new File(info.path)) + updateUsage(-info.size, committed = true) + info.size + }.sum + + logInfo(s"Deleted ${evicted.size} store(s) to free ${Utils.bytesToString(freed)} " + + s"(target = ${Utils.bytesToString(size)}).") + } else { + logWarning(s"Unable to free any space to make room for ${Utils.bytesToString(size)}.") + } + } + } + + private def appStorePath(appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(appStoreDir, fileName) + } + + private def updateAccessTime(appId: String, attemptId: Option[String]): Unit = { + val path = appStorePath(appId, attemptId) + val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId, attemptId, + sizeOf(path)) + listing.write(info) + } + + private def updateUsage(delta: Long, committed: Boolean = false): Unit = { + val updated = currentUsage.addAndGet(delta) + if (updated < 0) { + throw new IllegalStateException( + s"Disk usage tracker went negative (now = $updated, delta = $delta)") + } + if (committed) { + val updatedCommitted = committedUsage.addAndGet(delta) + if (updatedCommitted < 0) { + throw new IllegalStateException( + s"Disk usage tracker went negative (now = $updatedCommitted, delta = $delta)") + } + } + } + + /** Visible for testing. Return the size of a directory. */ + private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path) + + private[history] class Lease(val tmpPath: File, private val leased: Long) { + + /** + * Commits a lease to its final location, and update accounting information. This method + * marks the application as active, so its store is not available for eviction. + */ + def commit(appId: String, attemptId: Option[String]): File = { + val dst = appStorePath(appId, attemptId) + + active.synchronized { + require(!active.contains(appId -> attemptId), + s"Cannot commit lease for active application $appId / $attemptId") + + if (dst.isDirectory()) { + val size = sizeOf(dst) + deleteStore(dst) + updateUsage(-size, committed = true) + } + } + + updateUsage(-leased) + + val newSize = sizeOf(tmpPath) + makeRoom(newSize) + tmpPath.renameTo(dst) + + updateUsage(newSize, committed = true) + if (committedUsage.get() > maxUsage) { + val current = Utils.bytesToString(committedUsage.get()) + val max = Utils.bytesToString(maxUsage) + logWarning(s"Commit of application $appId / $attemptId causes maximum disk usage to be " + + s"exceeded ($current > $max)") + } + + updateAccessTime(appId, attemptId) + + active.synchronized { + active(appId -> attemptId) = newSize + } + dst + } + + /** Deletes the temporary directory created for the lease. */ + def rollback(): Unit = { + updateUsage(-leased) + FileUtils.deleteDirectory(tmpPath) + } + + } + +} + +private case class ApplicationStoreInfo( + @KVIndexParam path: String, + @KVIndexParam("lastAccess") lastAccess: Long, + appId: String, + attemptId: Option[String], + size: Long) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index 52dedc1a2ed41..22b6d49d8e2a4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.history import java.util.concurrent.TimeUnit import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit private[spark] object config { @@ -39,4 +40,8 @@ private[spark] object config { .stringConf .createOptional + val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("10g") + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index a77adc5ff3545..b3a5b1f1e05b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -352,14 +352,8 @@ private[spark] object EventLoggingListener extends Logging { */ def openEventLog(log: Path, fs: FileSystem): InputStream = { val in = new BufferedInputStream(fs.open(log)) - - // Compression codec is encoded as an extension, e.g. app_123.lzf - // Since we sanitize the app ID to not include periods, it is safe to split on it - val logName = log.getName.stripSuffix(IN_PROGRESS) - val codecName: Option[String] = logName.split("\\.").tail.lastOption - try { - val codec = codecName.map { c => + val codec = codecName(log).map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } codec.map(_.compressedInputStream(in)).getOrElse(in) @@ -370,4 +364,11 @@ private[spark] object EventLoggingListener extends Logging { } } + def codecName(log: Path): Option[String] = { + // Compression codec is encoded as an extension, e.g. app_123.lzf + // Since we sanitize the app ID to not include periods, it is safe to split on it + val logName = log.getName.stripSuffix(IN_PROGRESS) + logName.split("\\.").tail.lastOption + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala new file mode 100644 index 0000000000000..4b1b921582e00 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File + +import org.mockito.AdditionalAnswers +import org.mockito.Matchers.{any, anyBoolean, anyLong, eq => meq} +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.status.KVUtils +import org.apache.spark.util.{ManualClock, Utils} +import org.apache.spark.util.kvstore.KVStore + +class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter { + + import config._ + + private val MAX_USAGE = 3L + + private var testDir: File = _ + private var store: KVStore = _ + + before { + testDir = Utils.createTempDir() + store = KVUtils.open(new File(testDir, "listing"), "test") + } + + after { + store.close() + if (testDir != null) { + Utils.deleteRecursively(testDir) + } + } + + private def mockManager(): HistoryServerDiskManager = { + val conf = new SparkConf().set(MAX_LOCAL_DISK_USAGE, MAX_USAGE) + val manager = spy(new HistoryServerDiskManager(conf, testDir, store, new ManualClock())) + doAnswer(AdditionalAnswers.returnsFirstArg[Long]()).when(manager) + .approximateSize(anyLong(), anyBoolean()) + manager + } + + test("leasing space") { + val manager = mockManager() + + // Lease all available space. + val leaseA = manager.lease(1) + val leaseB = manager.lease(1) + val leaseC = manager.lease(1) + assert(manager.free() === 0) + + // Revert one lease, get another one. + leaseA.rollback() + assert(manager.free() > 0) + assert(!leaseA.tmpPath.exists()) + + val leaseD = manager.lease(1) + assert(manager.free() === 0) + + // Committing B should bring the "used" space up to 4, so there shouldn't be space left yet. + doReturn(2L).when(manager).sizeOf(meq(leaseB.tmpPath)) + val dstB = leaseB.commit("app2", None) + assert(manager.free() === 0) + assert(manager.committed() === 2) + + // Rollback C and D, now there should be 1 left. + leaseC.rollback() + leaseD.rollback() + assert(manager.free() === 1) + + // Release app 2 to make it available for eviction. + doReturn(2L).when(manager).sizeOf(meq(dstB)) + manager.release("app2", None) + assert(manager.committed() === 2) + + // Emulate an updated event log by replacing the store for lease B. Lease 1, and commit with + // size 3. + val leaseE = manager.lease(1) + doReturn(3L).when(manager).sizeOf(meq(leaseE.tmpPath)) + val dstE = leaseE.commit("app2", None) + assert(dstE === dstB) + assert(dstE.exists()) + doReturn(3L).when(manager).sizeOf(meq(dstE)) + assert(!leaseE.tmpPath.exists()) + assert(manager.free() === 0) + manager.release("app2", None) + assert(manager.committed() === 3) + + // Try a big lease that should cause the released app to be evicted. + val leaseF = manager.lease(6) + assert(!dstB.exists()) + assert(manager.free() === 0) + assert(manager.committed() === 0) + + // Leasing when no free space is available should still be allowed. + manager.lease(1) + assert(manager.free() === 0) + } + + test("tracking active stores") { + val manager = mockManager() + + // Lease and commit space for app 1, making it active. + val leaseA = manager.lease(2) + assert(manager.free() === 1) + doReturn(2L).when(manager).sizeOf(leaseA.tmpPath) + assert(manager.openStore("appA", None).isEmpty) + val dstA = leaseA.commit("appA", None) + + // Create a new lease. Leases are always granted, but this shouldn't cause app1's store + // to be deleted. + val leaseB = manager.lease(2) + assert(dstA.exists()) + + // Trying to commit on top of an active application should fail. + intercept[IllegalArgumentException] { + leaseB.commit("appA", None) + } + + leaseB.rollback() + + // Close appA with an updated size, then create a new lease. Now the app's directory should be + // deleted. + doReturn(3L).when(manager).sizeOf(dstA) + manager.release("appA", None) + assert(manager.free() === 0) + + val leaseC = manager.lease(1) + assert(!dstA.exists()) + leaseC.rollback() + + assert(manager.openStore("appA", None).isEmpty) + } + + test("approximate size heuristic") { + val manager = new HistoryServerDiskManager(new SparkConf(false), testDir, store, + new ManualClock()) + assert(manager.approximateSize(50L, false) < 50L) + assert(manager.approximateSize(50L, true) > 50L) + } + +}