diff --git a/core/src/main/scala/org/apache/spark/deploy/history/DiskStoreManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/DiskStoreManager.scala new file mode 100644 index 0000000000000..215582be26584 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/DiskStoreManager.scala @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.commons.io.FileUtils + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.kvstore.KVStore +import org.apache.spark.status.KVUtils._ +import org.apache.spark.util.Clock + +/** + * A class used to keep track of disk usage by the SHS, allowing application data to be "evicted" + * from disk when usage exceeds a configurable threshold. + * + * The goal of the class is not to guarantee that usage will never exceed the threshold; because of + * how application data is written, disk usage may temporarily go higher. But, eventually, it + * should fall back under the threshold. + * + * @param conf Spark configuration. + * @param path Path where to store application data. + * @param listing The listing store, used to persist usage data. + * @param clock Clock instance to use. + */ +private class DiskStoreManager( + conf: SparkConf, + path: File, + listing: KVStore, + clock: Clock) extends Logging { + + import config._ + + private val appStoreDir = new File(path, "apps") + if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).") + } + + private val tmpStoreDir = new File(path, "temp") + if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) { + throw new IllegalArgumentException(s"Failed to create temp directory ($tmpStoreDir).") + } + + private val eventLogSizeRatio = conf.get(EVENT_TO_STORE_SIZE_RATIO) + private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE) + private val currentUsage = new AtomicLong(0L) + private val active = new HashMap[(String, Option[String]), Long]() + + def initialize(): Unit = { + updateUsage(sizeOf(appStoreDir)) + + // Clean up any temporary stores during start up. This assumes that they're leftover from other + // instances and are not useful. + tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly) + + // Go through the recorded store directories and remove any that may have been removed by + // external code. + val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info => + !new File(info.path).exists() + }.toSeq + + orphans.foreach { info => + listing.delete(info.getClass(), info.path) + } + } + + /** + * Lease some space from the store. The leased space is calculated as a fraction of the given + * event log size; this is an approximation, and doesn't mean the application store cannot + * outgrow the lease. + * + * If there's not enough space for the lease, other applications might be evited to make room. + * This method always returns a lease, meaning that it's possible for local disk usage to grow + * past the configured threshold if there aren't enough idle applications to evict. + * + * While the lease is active, the data is written to a temporary location, so `openStore()` + * will still return `None` for the application. + */ + def lease(eventLogSize: Long): Lease = { + val needed = approximateSize(eventLogSize) + makeRoom(needed) + + val perms = PosixFilePermissions.fromString("rwx------") + val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore", + PosixFilePermissions.asFileAttribute(perms)).toFile() + + updateUsage(needed) + new Lease(tmp, needed) + } + + /** + * Returns whether there's enough free space to create a store for an application event log. + * This uses an approximation of what's the expected size of an application store given the + * size of the event log, since there's no way to really know that relationship up front. + */ + def hasFreeSpace(eventLogSize: Long): Boolean = { + approximateSize(eventLogSize) <= free() + } + + /** + * Returns the location of an application store if it's still available. Marks the store as + * being used so that it's not evicted when running out of designated space. + */ + def openStore(appId: String, attemptId: Option[String]): Option[File] = { + val storePath = active.synchronized { + val path = appStorePath(appId, attemptId) + if (path.isDirectory()) { + active(appId -> attemptId) = sizeOf(path) + Some(path) + } else { + None + } + } + + storePath.foreach { path => + updateAccessTime(appId, attemptId) + } + + storePath + } + + /** + * Tell the disk manager that the store for the given application is not being used anymore. + * + * @param delete Whether to delete the store from disk. + */ + def release(appId: String, attemptId: Option[String], delete: Boolean = false): Unit = { + // Because LevelDB may modify the structure of the store files even when just reading, update + // the accounting for this application when it's closed. + val oldSizeOpt = active.synchronized { + active.remove(appId -> attemptId) + } + + oldSizeOpt.foreach { oldSize => + val path = appStorePath(appId, attemptId) + updateUsage(-oldSize) + if (path.isDirectory()) { + if (delete) { + FileUtils.deleteDirectory(path) + } else { + updateUsage(sizeOf(path)) + } + } + } + } + + /** + * A non-scientific approximation of how big an app state store will be given the size of the + * event log. By default it's 30% of the event log size. + */ + private def approximateSize(eventLogSize: Long): Long = { + math.ceil(eventLogSizeRatio * eventLogSize).toLong + } + + /** Current free space. Considers space currently leased out too. */ + private def free(): Long = { + math.max(maxUsage - currentUsage.get(), 0L) + } + + private def makeRoom(size: Long): Unit = { + if (free() < size) { + logDebug(s"Not enough free space, looking at candidates for deletion...") + val evicted = new ListBuffer[ApplicationStoreInfo]() + val iter = listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator() + try { + var needed = size + while (needed > 0 && iter.hasNext()) { + val info = iter.next() + val isActive = active.synchronized { + active.contains(info.appId -> info.attemptId) + } + if (!isActive) { + evicted += info + needed -= info.size + } + } + } finally { + iter.close() + } + + evicted.foreach { info => + logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.") + FileUtils.deleteDirectory(new File(info.path)) + listing.delete(info.getClass(), info.path) + } + logDebug(s"Deleted a total of ${evicted.size} app stores.") + } + } + + private def appStorePath(appId: String, attemptId: Option[String]): File = { + val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" + new File(appStoreDir, fileName) + } + + private def updateAccessTime(appId: String, attemptId: Option[String]): Unit = { + val path = appStorePath(appId, attemptId) + val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId, attemptId, + sizeOf(path)) + listing.write(info) + } + + private def updateUsage(delta: Long): Unit = { + val updated = currentUsage.addAndGet(delta) + if (updated < 0) { + throw new IllegalStateException( + s"Disk usage tracker went negative (now = $updated, delta = $delta)") + } + } + + /** Visible for testing. Return the size of a directory. */ + private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path) + + class Lease( + val path: File, + private val leased: Long) { + + /** + * Commits a lease to its final location, and update accounting information. Optionally + * marks the application as active, so that it's not eligible for eviction if data needs + * to be cleaned up. + */ + def commit(appId: String, attemptId: Option[String], activate: Boolean = false): File = { + val dst = appStorePath(appId, attemptId) + + active.synchronized { + require(!active.contains(appId -> attemptId), + s"Cannot commit lease for active application $appId / $attemptId") + + if (dst.isDirectory()) { + val size = sizeOf(dst) + FileUtils.deleteDirectory(dst) + updateUsage(-size) + } + } + + updateUsage(-leased) + + val newSize = sizeOf(path) + makeRoom(newSize) + path.renameTo(dst) + updateUsage(newSize) + updateAccessTime(appId, attemptId) + + if (activate) { + active.synchronized { + active(appId -> attemptId) = newSize + } + } + dst + } + + /** Deletes the temporary directory created for the lease. */ + def rollback(): Unit = { + updateUsage(-leased) + FileUtils.deleteDirectory(path) + } + + } + +} + +private case class ApplicationStoreInfo( + @KVIndexParam path: String, + @KVIndexParam("lastAccess") lastAccess: Long, + appId: String, + attemptId: Option[String], + size: Long) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 82c9a638bd827..b1755463c39bd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,9 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} -import java.nio.file.Files -import java.nio.file.attribute.PosixFilePermissions -import java.util.{Date, ServiceLoader, UUID} +import java.util.{Date, UUID} import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -44,7 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.kvstore._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ -import org.apache.spark.status.{AppStatePlugin, AppStateStore, ElementTrackingStore} +import org.apache.spark.status.{AppStatePlugin, AppStateStore} import org.apache.spark.status.KVUtils import org.apache.spark.status.KVUtils._ import org.apache.spark.status.api.v1 @@ -132,11 +130,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val storePath = new File(conf.get(LOCAL_STORE_DIR)) require(storePath.isDirectory(), s"Configured store directory ($storePath) does not exist.") + private val listingPath = new File(storePath, "listing.ldb") private val listing = { val metadata = new KVStoreMetadata(CURRENT_VERSION, AppStateStore.CURRENT_VERSION, logDir.toString()) try { - open(new File(storePath, "listing.ldb"), metadata) + open(listingPath, metadata) } catch { case e: Exception => // If there's an error, remove the listing database and any existing UI database @@ -144,10 +143,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // incompatible information. logWarning(s"Error while opening existing listing database, creating new one.", e) storePath.listFiles().foreach(Utils.deleteRecursively) - open(new File(storePath, "listing.ldb"), metadata) + open(listingPath, metadata) } } + private val storeManager = new DiskStoreManager(conf, storePath, listing, clock) private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]() /** @@ -218,6 +218,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private def startPolling(): Unit = { + storeManager.initialize() + // Validate the log directory. val path = new Path(logDir) try { @@ -295,27 +297,27 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) attempt.adminAclsGroups.getOrElse("")) secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) - val path = uiStorePath(appId, attemptId) - if (!path.exists()) { - throw new IllegalStateException( - s"Application entry for $appId / $attemptId found, but UI not available.") - } + val path = storeManager.openStore(appId, attemptId).getOrElse( + rebuildAppStore(appId, attemptId, attempt.logPath)) - // Create the UI under a lock so that a valid disk store is used, in case the update thread - // is writing a new disk store for the application (see replaceStore()). - val loadedUI = synchronized { - val store = AppStateStore.loadStore(conf, path) + val store = AppStateStore.loadStore(conf, path) + val loadedUI = try { val ui = SparkUI.create(None, store, conf, secManager, app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId), attempt.info.startTime.getTime()) AppStatePlugin.loadPlugins().foreach { plugin => plugin.setupUI(ui) } - val loaded = LoadedAppUI(ui) - activeUIs((appId, attemptId)) = loaded - loaded + LoadedAppUI(ui) + } catch { + case e: Exception => + store.close() + throw e } + synchronized { + activeUIs((appId, attemptId)) = loadedUI + } Some(loadedUI) } @@ -366,6 +368,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { loadedUI.lock.writeLock().unlock() } + storeManager.release(appId, attemptId) } } @@ -507,37 +510,36 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { val logPath = fileStatus.getPath() - logInfo(s"Replaying log path: $logPath") - val bus = new ReplayListenerBus() val listener = new AppListingListener(fileStatus, clock) bus.addListener(listener) - // Write the UI data to a temp location. - val tempUiPath = createTempDir("uistore") - val store = AppStateStore.createStore(tempUiPath, conf, bus) - val appCompleted = isApplicationCompleted(fileStatus) - val logInput = EventLoggingListener.openEventLog(logPath, fs) - try { - bus.replay(logInput, logPath.toString, !appCompleted) - store.close() - } catch { - case e: Exception => - store.close() - Utils.deleteRecursively(tempUiPath) - throw e - } finally { - logInput.close() + val filter: ReplayEventsFilter = { eventString => + eventString.startsWith(APPL_START_EVENT_PREFIX) || + eventString.startsWith(APPL_END_EVENT_PREFIX) || + eventString.startsWith(ENV_UPDATE_EVENT_PREXFIX) } + replay(fileStatus, bus, eventsFilter = filter) + // Move the UI store to its final location if the app ID is known, otherwise discard it. listener.applicationInfo.foreach { app => addListing(app) - replaceStore(app.info.id, app.attempts.head.info.attemptId, tempUiPath) + + val appId = app.info.id + val attemptId = app.attempts.head.info.attemptId + // If there's an active UI for the application, invalidate it, close its store, and delete + // the disk data so that future requests will load an updated UI. + synchronized { + activeUIs.remove(appId -> attemptId).foreach { loadedUI => + loadedUI.invalidate() + loadedUI.ui.store.close() + storeManager.release(appId, attemptId, delete = true) + } + } } - Utils.deleteRecursively(tempUiPath) + listing.write(new LogInfo(logPath.toString(), fileStatus.getLen())) - logInfo(s"Finished parsing $logPath") } /** @@ -597,10 +599,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private def replay( eventLog: FileStatus, - appCompleted: Boolean, bus: ReplayListenerBus, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { val logPath = eventLog.getPath() + val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS) logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, // and when we read the file here. That is OK -- it may result in an unnecessary refresh @@ -610,17 +612,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { - bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) + bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter) } finally { logInput.close() + logInfo(s"Finished parsing $logPath") } } /** - * Return true when the application has completed. + * Rebuilds the application state store from its event log. */ - private def isApplicationCompleted(entry: FileStatus): Boolean = { - !entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS) + private def rebuildAppStore(appId: String, attemptId: Option[String], logName: String): File = { + val path = new Path(logDir, logName) + val status = fs.getFileStatus(path) + val lease = storeManager.lease(status.getLen()) + try { + val bus = new ReplayListenerBus() + val store = AppStateStore.createStore(lease.path, conf, bus) + try { + replay(status, bus) + } finally { + store.close() + } + + lease.commit(appId, attemptId, activate = true) + } catch { + case e: Exception => + lease.rollback() + throw e + } } /** @@ -693,38 +713,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.write(newAppInfo) } - private def createTempDir(name: String): File = { - val perms = PosixFilePermissions.fromString("rwx------") - Files.createTempDirectory(storePath.toPath(), name, - PosixFilePermissions.asFileAttribute(perms)).toFile() - } - - private def uiStorePath(appId: String, attemptId: Option[String]): File = { - val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb" - new File(storePath, fileName) - } - - private def replaceStore(appId: String, attemptId: Option[String], newStore: File): Unit = { - val uiStore = uiStorePath(appId, attemptId) - - synchronized { - // If there's an active UI for the application, invalidate it and close its store, so that - // we can replace it with the updated one. - activeUIs.remove((appId, attemptId)).foreach { loadedUI => - loadedUI.invalidate() - loadedUI.ui.store.close() - } - - if (uiStore.exists()) { - Utils.deleteRecursively(uiStore) - } - - if (!newStore.renameTo(uiStore)) { - throw new IOException(s"Failed to rename UI store from $newStore to $uiStore.") - } - } - } - /** For testing. Returns internal data about a single attempt. */ private[history] def getAttempt(appId: String, attemptId: Option[String]): KVStoreAttemptInfo = { load(appId).attempts.find(_.info.attemptId == attemptId).getOrElse( @@ -742,6 +730,8 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" + private val ENV_UPDATE_EVENT_PREXFIX = "{\"Event\":\"SparkListenerEnvironmentUpdate\"," + private val CURRENT_VERSION = 1L } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala index 9ca07e3d63271..fb9a238c4df91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.history import java.util.concurrent.TimeUnit import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit private[spark] object config { @@ -37,4 +38,12 @@ private[spark] object config { .stringConf .createWithDefault("/var/lib/spark-history") + val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("10g") + + val EVENT_TO_STORE_SIZE_RATIO = ConfigBuilder("spark.history.store.eventLogSizeRatio") + .doubleConf + .createWithDefault(0.3D) + } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/DiskStoreManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/DiskStoreManagerSuite.scala new file mode 100644 index 0000000000000..9f904074aa77e --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/DiskStoreManagerSuite.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.io.File + +import org.mockito.Matchers.{any, eq => meq} +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.kvstore.KVStore +import org.apache.spark.status.KVUtils +import org.apache.spark.util.{ManualClock, Utils} + +class DiskStoreManagerSuite extends SparkFunSuite with BeforeAndAfter { + + import config._ + + private val clock = new ManualClock() + private var testDir: File = _ + private var store: KVStore = _ + private var manager: DiskStoreManager = _ + + before { + if (testDir != null) { + Utils.deleteRecursively(testDir) + } + testDir = Utils.createTempDir() + store = KVUtils.open(new File(testDir, "listing"), "test") + + val conf = new SparkConf() + .set(MAX_LOCAL_DISK_USAGE, 3L) + .set(EVENT_TO_STORE_SIZE_RATIO, 1.0D) + manager = spy(new DiskStoreManager(conf, testDir, store, clock)) + doReturn(0L).when(manager).sizeOf(any(classOf[File])) + } + + after { + store.close() + } + + test("leasing space") { + // Lease all available space. + val lease1 = manager.lease(1) + val lease2 = manager.lease(1) + val lease3 = manager.lease(1) + assert(!manager.hasFreeSpace(1)) + + // Revert one lease, get another one. + lease1.rollback() + assert(manager.hasFreeSpace(1)) + assert(!lease1.path.exists()) + + val lease4 = manager.lease(1) + assert(!manager.hasFreeSpace(1)) + + // Committing 2 should bring the "used" space up to 4, so there shouldn't be space left yet. + doReturn(2L).when(manager).sizeOf(meq(lease2.path)) + val dst2 = lease2.commit("app2", None) + assert(!manager.hasFreeSpace(1)) + + // Rollback 3 and 4, now there should be 1 left. + lease3.rollback() + lease4.rollback() + assert(manager.hasFreeSpace(1)) + assert(!manager.hasFreeSpace(2)) + + // Lease 1, commit with size 3, replacing previously commited lease 2. + val lease5 = manager.lease(1) + doReturn(2L).when(manager).sizeOf(meq(dst2)) + doReturn(3L).when(manager).sizeOf(meq(lease5.path)) + lease5.commit("app2", None) + assert(dst2.exists()) + assert(!lease5.path.exists()) + assert(!manager.hasFreeSpace(1)) + + // Try a big lease that should cause the committed app to be evicted. + val lease6 = manager.lease(6) + assert(!dst2.exists()) + assert(!manager.hasFreeSpace(1)) + } + + test("tracking active stores") { + // Lease and commit space for app 1, making it active. + val lease1 = manager.lease(2) + assert(!manager.hasFreeSpace(2)) + doReturn(2L).when(manager).sizeOf(lease1.path) + assert(manager.openStore("app1", None).isEmpty) + val dst1 = lease1.commit("app1", None, activate = true) + + // Create a new lease. Leases are always granted, but this shouldn't cause app1's store + // to be deleted. + val lease2 = manager.lease(2) + assert(dst1.exists()) + + // Trying to commit on top of an active application should fail. + intercept[IllegalArgumentException] { + lease2.commit("app1", None) + } + + lease2.rollback() + + // Close app1 with an updated size, then create a new lease. Now the app's directory should be + // deleted. + doReturn(3L).when(manager).sizeOf(dst1) + manager.release("app1", None) + assert(!manager.hasFreeSpace(1)) + + val lease3 = manager.lease(1) + assert(!dst1.exists()) + lease3.rollback() + + assert(manager.openStore("app1", None).isEmpty) + } + +}