diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index a370526c46f3..9b2affab5bc1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -24,12 +24,11 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import com.codahale.metrics.{Counter, MetricRegistry, Timer} +import com.codahale.metrics.{Counter, Counting, Metric, Timer} import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification} import org.eclipse.jetty.servlet.FilterHolder import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.Source import org.apache.spark.ui.SparkUI import org.apache.spark.util.Clock @@ -388,7 +387,7 @@ private[history] final case class CacheKey(appId: String, attemptId: Option[Stri * Metrics of the cache * @param prefix prefix to register all entries under */ -private[history] class CacheMetrics(prefix: String) extends Source { +private[history] class CacheMetrics(prefix: String) extends HistoryMetricSource(prefix) { /* metrics: counters and timers */ val lookupCount = new Counter() @@ -410,34 +409,25 @@ private[history] class CacheMetrics(prefix: String) extends Source { ("update.triggered.count", updateTriggeredCount)) /** all metrics, including timers */ - private val allMetrics = counters ++ Seq( + private val allMetrics: Seq[(String, Metric with Counting)] = counters ++ Seq( ("load.timer", loadTimer), ("update.probe.timer", updateProbeTimer)) /** * Name of metric source */ - override val sourceName = "ApplicationCache" - - override val metricRegistry: MetricRegistry = new MetricRegistry + override val sourceName = "application.cache" /** * Startup actions. * This includes registering metrics with [[metricRegistry]] */ private def init(): Unit = { - allMetrics.foreach { case (name, metric) => - metricRegistry.register(MetricRegistry.name(prefix, name), metric) - } + register(allMetrics) } - override def toString: String = { - val sb = new StringBuilder() - counters.foreach { case (name, counter) => - sb.append(name).append(" = ").append(counter.getCount).append('\n') - } - sb.toString() - } + init() + } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index d7d82800b8b5..ab7ec930603d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,11 +17,14 @@ package org.apache.spark.deploy.history +import java.util.concurrent.atomic.AtomicBoolean import java.util.zip.ZipOutputStream import scala.xml.Node -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkFirehoseListener} +import org.apache.spark.metrics.source.Source +import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI private[spark] case class ApplicationAttemptInfo( @@ -74,6 +77,8 @@ private[history] case class LoadedAppUI( private[history] abstract class ApplicationHistoryProvider { + private val started = new AtomicBoolean(false) + /** * Returns the count of application event logs that the provider is currently still processing. * History Server UI can use this to indicate to a user that the application listing on the UI @@ -98,6 +103,19 @@ private[history] abstract class ApplicationHistoryProvider { return 0; } + /** + * Bind to the History Server: threads should be started here; exceptions may be raised + * Start the provider: threads should be started here; exceptions may be raised + * if the history provider cannot be started. + * The base implementation contains a re-entrancy check and should + * be invoked first. + * @return the metric information for registration + */ + def start(): Option[Source] = { + require(!started.getAndSet(true), "History provider already started") + None + } + /** * Returns a list of applications available for the history server to show. * @@ -145,3 +163,15 @@ private[history] abstract class ApplicationHistoryProvider { */ def getEmptyListingHtml(): Seq[Node] = Seq.empty } + +/** + * A simple counter of events. + * There is no concurrency support here: all events must come in sequentially. + */ +private[history] class EventCountListener extends SparkFirehoseListener { + var eventCount = 0L + + override def onEvent(event: SparkListenerEvent): Unit = { + eventCount += 1 + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 9012736bc274..d890c10beedc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -25,6 +25,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable import scala.xml.Node +import com.codahale.metrics.{Counter, MetricRegistry, Timer} import com.google.common.io.ByteStreams import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -35,6 +36,7 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.ui.SparkUI @@ -128,6 +130,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) + /** filesystem metrics: visible for test access */ + private[history] val metrics = new FsHistoryProviderMetrics(this, "history.provider") + /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. @@ -163,6 +168,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Start the provider: threads should be started here; exceptions may be raised + * if the history provider cannot be started. + * The base implementation contains a re-entrancy check and should + * be invoked first. + * @return the metric information for registration + */ + override def start(): Option[Source] = { + super.start() + Some(metrics) + } + private[history] def startSafeModeCheckThread( errorHandler: Option[Thread.UncaughtExceptionHandler]): Thread = { // Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait @@ -239,41 +256,52 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getLastUpdatedTime(): Long = lastScanTime.get() override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { - try { - applications.get(appId).flatMap { appInfo => - appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => - val replayBus = new ReplayListenerBus() - val ui = { - val conf = this.conf.clone() - val appSecManager = new SecurityManager(conf) - SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, - HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) - // Do not call ui.bind() to avoid creating a new server for each application - } + metrics.appUILoadCount.inc() + time(metrics.appUiLoadTimer, Some(metrics.appUITotalLoadTime)) { + try { + applications.get(appId).flatMap { appInfo => + appInfo.attempts.find(_.attemptId == attemptId).flatMap { attempt => + val replayBus = new ReplayListenerBus() + val ui = { + val conf = this.conf.clone() + val appSecManager = new SecurityManager(conf) + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, + HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime) + // Do not call ui.bind() to avoid creating a new server for each application + } + + val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) + + val (appListener, count) = replay(fileStatus, + isApplicationCompleted(fileStatus), replayBus) + metrics.appUIEventCount.inc(count) + + if (appListener.appId.isDefined) { + ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) + // make sure to set admin acls before view acls so they are properly picked up + val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") + ui.getSecurityManager.setAdminAcls(adminAcls) + ui.getSecurityManager.setViewAcls(attempt.sparkUser, + appListener.viewAcls.getOrElse("")) + val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + + appListener.adminAclsGroups.getOrElse("") + ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) + ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) + Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) + } else { + None + } - val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath)) - - val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus) - - if (appListener.appId.isDefined) { - ui.getSecurityManager.setAcls(HISTORY_UI_ACLS_ENABLE) - // make sure to set admin acls before view acls so they are properly picked up - val adminAcls = HISTORY_UI_ADMIN_ACLS + "," + appListener.adminAcls.getOrElse("") - ui.getSecurityManager.setAdminAcls(adminAcls) - ui.getSecurityManager.setViewAcls(attempt.sparkUser, appListener.viewAcls.getOrElse("")) - val adminAclsGroups = HISTORY_UI_ADMIN_ACLS_GROUPS + "," + - appListener.adminAclsGroups.getOrElse("") - ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) - ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) - } else { - None } - } + } catch { + case e: FileNotFoundException => + metrics.appUILoadNotFoundCount.inc() + None + case other: Exception => + metrics.appUILoadFailureCount.inc() + throw other } - } catch { - case e: FileNotFoundException => None } } @@ -310,77 +338,87 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { - try { - val newLastScanTime = getNewLastScanTime() - logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) - .getOrElse(Seq[FileStatus]()) - // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList - .filter { entry => - try { - val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) - !entry.isDirectory() && - // FsHistoryProvider generates a hidden file which can't be read. Accidentally - // reading a garbage file is safe, but we would log an error which can be scary to - // the end-user. - !entry.getPath().getName().startsWith(".") && - prevFileSize < entry.getLen() - } catch { - case e: AccessControlException => - // Do not use "logInfo" since these messages can get pretty noisy if printed on - // every poll. - logDebug(s"No permission to read $entry, ignoring.") - false + metrics.updateCount.inc() + metrics.updateLastAttempted.touch() + time(metrics.updateTimer) { + try { + val newLastScanTime = getNewLastScanTime() + logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) + .getOrElse(Seq[FileStatus]()) + // scan for modified applications, replay and merge them + val logInfos: Seq[FileStatus] = statusList + .filter { entry => + try { + val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L) + !entry.isDirectory() && + // FsHistoryProvider generates a hidden file which can't be read. Accidentally + // reading a garbage file is safe, but we would log an error which can be scary to + // the end-user. + !entry.getPath().getName().startsWith(".") && + prevFileSize < entry.getLen() + } catch { + case e: AccessControlException => + // Do not use "logInfo" since these messages can get pretty noisy if printed on + // every poll. + logDebug(s"No permission to read $entry, ignoring.") + false + } + } + .flatMap { entry => Some(entry) } + .sortWith { case (entry1, entry2) => + entry1.getModificationTime() >= entry2.getModificationTime() } - } - .flatMap { entry => Some(entry) } - .sortWith { case (entry1, entry2) => - entry1.getModificationTime() >= entry2.getModificationTime() - } if (logInfos.nonEmpty) { logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}") } - var tasks = mutable.ListBuffer[Future[_]]() - - try { - for (file <- logInfos) { - tasks += replayExecutor.submit(new Runnable { - override def run(): Unit = mergeApplicationListing(file) - }) - } - } catch { - // let the iteration over logInfos break, since an exception on - // replayExecutor.submit (..) indicates the ExecutorService is unable - // to take any more submissions at this time - - case e: Exception => - logError(s"Exception while submitting event log for replay", e) - } - - pendingReplayTasksCount.addAndGet(tasks.size) + var tasks = mutable.ListBuffer[Future[_]]() - tasks.foreach { task => try { - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - task.get() + for (file <- logInfos) { + tasks += replayExecutor.submit(new Runnable { + override def run(): Unit = + time(metrics.historyMergeTimer, Some(metrics.historyTotalMergeTime)) { + mergeApplicationListing(file) + } + }) + } } catch { - case e: InterruptedException => - throw e + // let the iteration over logInfos break, since an exception on + // replayExecutor.submit (..) indicates the ExecutorService is unable + // to take any more submissions at this time + case e: Exception => - logError("Exception while merging application listings", e) - } finally { - pendingReplayTasksCount.decrementAndGet() + logError(s"Exception while submitting event log for replay", e) } - } - lastScanTime.set(newLastScanTime) - } catch { - case e: Exception => logError("Exception in checking for event log updates", e) + pendingReplayTasksCount.addAndGet(tasks.size) + + tasks.foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + pendingReplayTasksCount.decrementAndGet() + } + } + + lastScanTime.set(newLastScanTime) + metrics.updateLastSucceeded.setValue(newLastScanTime) + } catch { + case e: Exception => logError( + "Exception in checking for event log updates", e) + metrics.updateFailureCount.inc() + } } } @@ -460,7 +498,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // won't change whenever HistoryServer restarts and reloads the file. val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() - val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter) + val (appListener, count) = replay(fileStatus, appCompleted, + new ReplayListenerBus(), eventsFilter) + metrics.historyEventCount.inc(count) // Without an app ID, new logs will render incorrectly in the listing page, so do not list or // try to show their UI. @@ -629,12 +669,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) * an `ApplicationEventListener` instance with event data captured from the replay. * `ReplayEventsFilter` determines what events are replayed and can therefore limit the * data captured in the returned `ApplicationEventListener` instance. + * + * @param eventLog reference to the event log to play back. + * @param appCompleted has the application completed? + * @param bus event bus to play events to + * @param eventsFilter filter for events + * @return the event listener and the number of processed events + * */ private def replay( eventLog: FileStatus, appCompleted: Boolean, bus: ReplayListenerBus, - eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = { + eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): (ApplicationEventListener, Long) = { val logPath = eventLog.getPath() logInfo(s"Replaying log path: $logPath") // Note that the eventLog may have *increased* in size since when we grabbed the filestatus, @@ -645,10 +692,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // after it's created, so we get a file size that is no bigger than what is actually read. val logInput = EventLoggingListener.openEventLog(logPath, fs) try { + var countListener = new EventCountListener() val appListener = new ApplicationEventListener bus.addListener(appListener) + bus.addListener(countListener) bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter) - appListener + (appListener, countListener.eventCount) } finally { logInput.close() } @@ -729,6 +778,116 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) prevFileSize < latest.fileSize } } + + /** + * Time a closure, returning its output. + * The timer is updated with the duration, and if a counter is supplied, it's count + * is incremented by the duration. + * @param timer timer + * @param counter counter: an optional counter of the duration + * @param fn function + * @tparam T type of return value of time + * @return the result of the function. + */ + private def time[T](timer: Timer, counter: Option[Counter] = None)(fn: => T): T = { + val timeCtx = timer.time() + try { + fn + } finally { + val duration = timeCtx.stop() + counter.foreach(_.inc(duration)) + } + } +} + +/** + * Metrics integration: the various counters of activity. + */ +private[history] class FsHistoryProviderMetrics(owner: FsHistoryProvider, prefix: String) + extends HistoryMetricSource(prefix) { + + /** + * Function to return an average; if the count is 0, so is the average. + * @param value value to average + * @param count event count to divide by + * @return the average, or 0 if the counter is itself 0 + */ + private def average(value: Long, count: Long): Long = { + if (count> 0) value / count else 0 + } + + override val sourceName = "history.fs" + + private val name = MetricRegistry.name(sourceName) + + /** Number of updates. */ + val updateCount = new Counter() + + /** Number of update failures. */ + val updateFailureCount = new Counter() + + /** Number of events replayed as listing merge. */ + val historyEventCount = new Counter() + + /** Timer of listing merges. */ + val historyMergeTimer = new Timer() + + /** Total time to merge all histories. */ + val historyTotalMergeTime = new Counter() + + /** Average time to process an event in the history merge operation. */ + val historyEventMergeTime = new LambdaLongGauge(() => + average(historyTotalMergeTime.getCount, historyEventCount.getCount)) + + /** Number of events replayed. */ + val appUIEventCount = new Counter() + + /** Update duration timer. */ + val updateTimer = new Timer() + + private val clock = new SystemClock + + /** Time the last update was attempted. */ + val updateLastAttempted = new TimestampGauge(clock) + + /** Time the last update succeded. */ + val updateLastSucceeded = new TimestampGauge(clock) + + /** Number of App UI load operations. */ + val appUILoadCount = new Counter() + + /** Number of App UI load operations that failed due to a load/parse/replay problem. */ + val appUILoadFailureCount = new Counter() + + /** Number of App UI load operations that failed due to an unknown file. */ + val appUILoadNotFoundCount = new Counter() + + /** Statistics on time to load app UIs. */ + val appUiLoadTimer = new Timer() + + /** Total load time of all App UIs. */ + val appUITotalLoadTime = new Counter() + + /** Average time to load a single event in the App UI */ + val appUIEventReplayTime = new LambdaLongGauge(() => + average(appUITotalLoadTime.getCount, appUIEventCount.getCount)) + + register(Seq( + ("history.merge.event.count", historyEventCount), + ("history.merge.event.time", historyEventMergeTime), + ("history.merge.duration", historyTotalMergeTime), + ("update.count", updateCount), + ("update.failure.count", updateFailureCount), + ("update.last.attempted", updateLastAttempted), + ("update.last.succeeded", updateLastSucceeded), + ("appui.load.count", appUILoadCount), + ("appui.load.duration", appUITotalLoadTime), + ("appui.load.failure.count", appUILoadFailureCount), + ("appui.load.not-found.count", appUILoadNotFoundCount), + ("appui.event.count", appUIEventCount), + ("appui.event.replay.time", appUIEventReplayTime), + ("update.timer", updateTimer), + ("history.merge.timer", historyMergeTimer))) } private[history] object FsHistoryProvider { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryMetricSource.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryMetricSource.scala new file mode 100644 index 000000000000..361089bef4f9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryMetricSource.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.JavaConverters._ + +import com.codahale.metrics.{Counter, Gauge, Metric, MetricFilter, MetricRegistry, Timer} + +import org.apache.spark.metrics.source.Source +import org.apache.spark.util.Clock + +/** + * An abstract implementation of the metrics [[Source]] trait with some common operations for + * retrieving entries; the `toString()` operation dumps all counters and gauges. + */ +private[history] abstract class HistoryMetricSource(val prefix: String) extends Source { + + override val metricRegistry = new MetricRegistry() + + /** + * Register a sequence of metrics + * @param metrics sequence of metrics to register + */ + def register(metrics: Seq[(String, Metric)]): Unit = { + metrics.foreach { case (name, metric) => + metricRegistry.register(fullname(name), metric) + } + } + + /** + * Create the full name of a metric by prepending the prefix to the name + * @param name short name + * @return the full name to use in registration + */ + def fullname(name: String): String = { + MetricRegistry.name(prefix, name) + } + + /** + * Dump the counters and gauges. + * @return a string for logging and diagnostics -not for parsing by machines. + */ + override def toString: String = { + val sb = new StringBuilder(s"Metrics for $sourceName:\n") + def robustAppend(s : => Long) = { + try { + sb.append(s) + } catch { + case e: Exception => + sb.append(s"(exception: $e)") + } + } + + sb.append(" Counters\n") + + metricRegistry.getCounters.asScala.foreach { case (name, counter) => + sb.append(" ").append(name).append(" = ") + .append(counter.getCount).append('\n') + } + sb.append(" Gauges\n") + metricRegistry.getGauges.asScala.foreach { case (name, gauge) => + sb.append(" ").append(name).append(" = ") + try { + sb.append(gauge.getValue) + } catch { + case e: Exception => + sb.append(s"(exception: $e)") + } + sb.append('\n') + } + sb.toString() + } + + /** + * Get a named counter. + * @param counterName name of the counter + * @return the counter, if found + */ + def getCounter(counterName: String): Option[Counter] = { + val full = fullname(counterName) + Option(metricRegistry.getCounters(new MetricByName(full)).get(full)) + } + + /** + * Get a gauge of an unknown numeric type. + * @param gaugeName name of the gauge + * @return gauge, if found + */ + def getGauge(gaugeName: String): Option[Gauge[_]] = { + val full = fullname(gaugeName) + Option(metricRegistry.getGauges(new MetricByName(full)).get(full)) + } + + /** + * Get a Long gauge. + * @param gaugeName name of the gauge + * @return gauge, if found + * @throws ClassCastException if the gauge is found but of the wrong type + */ + def getLongGauge(gaugeName: String): Option[Gauge[Long]] = { + getGauge(gaugeName).asInstanceOf[Option[Gauge[Long]]] + } + + /** + * Get a timer. + * @param timerName name of the timer + * @return the timer, if found. + */ + def getTimer(timerName: String): Option[Timer] = { + val full = fullname(timerName) + Option(metricRegistry.getTimers(new MetricByName(full)).get(full)) + } + + /** + * A filter for metrics by name; include the prefix in the name. + * @param fullname full name of metric + */ + private class MetricByName(fullname: String) extends MetricFilter { + def matches(metricName: String, metric: Metric): Boolean = metricName == fullname + } +} + +/** + * A Long gauge from a lambda expression; the expression is evaluated + * whenever the metrics are queried. + * @param expression expression which generates the value. + */ +private[history] class LambdaLongGauge(expression: (() => Long)) extends Gauge[Long] { + override def getValue: Long = expression() +} + +/** + * A timestamp is a gauge which is set to a point in time + * as measured in millseconds since the epoch began. + */ +private[history] class TimestampGauge(clock: Clock) extends Gauge[Long] { + var time = 0L + + /** Current value. */ + override def getValue: Long = time + + /** Set a new value. */ + def setValue(t: Long): Unit = { + time = t + } + + /** Set to the current system time. */ + def touch(): Unit = { + setValue(clock.getTimeMillis()) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 54f39f7620e5..e13e86c47dde 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -30,6 +30,8 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -63,6 +65,11 @@ class HistoryServer( // application private val appCache = new ApplicationCache(this, retainedApplications, new SystemClock()) + private[history] val metricsSystem = MetricsSystem.createMetricsSystem("history", + conf, securityManager) + + private[history] var metricsRegistry = metricsSystem.getMetricRegistry + // and its metrics, for testing as well as monitoring val cacheMetrics = appCache.metrics @@ -110,6 +117,14 @@ class HistoryServer( appCache.getSparkUI(appKey) } + val historyMetrics = new HistoryMetrics(this, "history.server") + + /** + * Provider metrics are None until the provider is started, and only after that + * point if the provider returns any. + */ + var providerMetrics: Option[Source] = None + initialize() /** @@ -131,16 +146,50 @@ class HistoryServer( attachHandler(contextHandler) } - /** Bind to the HTTP server behind this web interface. */ + /** + * Startup Actions. + * 1. Call `start()` on the provider (and maybe get some metrics back). + * 2. Start the metrics. + * 3. Bind to the HTTP server behind this web interface. + */ override def bind() { + providerMetrics = provider.start() + startMetrics() super.bind() } - /** Stop the server and close the file system. */ + /** + * Start up the metrics. + * This includes registering any metrics defined in `providerMetrics`; the provider + * needs its `start()` method to be invoked to get these metric *prior to this method + * being invoked*. + */ + private def startMetrics(): Unit = { + // hook up metrics + metricsSystem.registerSource(historyMetrics) + metricsSystem.registerSource(appCache.metrics) + providerMetrics.foreach(metricsSystem.registerSource) + metricsSystem.start() + metricsSystem.getServletHandlers.foreach(attachHandler) + } + + /** + * Stop the server. + * And: + * 1. Stop the application cache. + * 2. Stop the history provider. + * 3. Stop the metrics system. + */ override def stop() { - super.stop() - provider.stop() - appCache.stop() + try { + super.stop() + } finally { + appCache.stop() + if (provider != null) { + provider.stop() + } + metricsSystem.stop() + } } /** Attach a reconstructed UI to this server. Only valid after bind(). */ @@ -249,6 +298,15 @@ class HistoryServer( } } +/** + * History system metrics independent of providers go in here. + * @param owner owning instance + */ +private[history] class HistoryMetrics(val owner: HistoryServer, prefix: String) + extends HistoryMetricSource(prefix) { + override val sourceName = "history" +} + /** * The recommended way of starting and stopping a HistoryServer is through the scripts * start-history-server.sh and stop-history-server.sh. The path to a base log directory, diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 1d494500cdb5..3f91d9edba16 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -211,6 +211,8 @@ private[spark] class MetricsSystem private ( } } } + + private[spark] def getMetricRegistry(): MetricRegistry = registry } private[spark] object MetricsSystem { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ec580a44b8e7..92f0b35d184c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -26,6 +26,7 @@ import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.concurrent.duration._ import scala.language.postfixOps +import com.codahale.metrics.MetricRegistry import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.hdfs.DistributedFileSystem import org.json4s.jackson.JsonMethods._ @@ -45,9 +46,11 @@ import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging { private var testDir: File = null + private var metrics: MetricRegistry = _ before { testDir = Utils.createTempDir(namePrefix = s"a b%20c+d") + metrics = new MetricRegistry() } after { @@ -66,9 +69,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc new File(logPath) } + /** + * Create and configure a new history provider + * @return a filesystem history provider ready for use + */ + private def createHistoryProvider(): FsHistoryProvider = { + val provider = new FsHistoryProvider(createTestConf()) + provider.start() + provider + } + + /** + * Create and configure a new history provider + * @return a filesystem history provider ready for use + */ + private def createHistoryProvider(clock: Clock): FsHistoryProvider = { + val provider = new FsHistoryProvider(createTestConf(), clock) + provider.start() + provider + } + test("Parse application logs") { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(), clock) + val provider = createHistoryProvider(clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -145,14 +168,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc ) logFile2.setReadable(false, false) - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() updateAndCheck(provider) { list => list.size should be (1) } } test("history file is renamed from inprogress to completed") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -186,7 +209,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-5582: empty log directory") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, true, None, @@ -202,7 +225,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("apps with multiple attempts with order") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, true, None, @@ -347,7 +370,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("Event log copy") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, true, None, @@ -382,7 +405,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc } test("SPARK-8372: new logs with no app ID are ignored") { - val provider = new FsHistoryProvider(createTestConf()) + val provider = createHistoryProvider() // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryMetricSourceSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryMetricSourceSuite.scala new file mode 100644 index 000000000000..04276f019c5d --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryMetricSourceSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import scala.collection.JavaConverters._ + +import com.codahale.metrics.{Counter, Timer} +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.util.ManualClock + +class HistoryMetricSourceSuite extends SparkFunSuite with Matchers { + + test("LambdaLongGauge") { + assert(3L === new LambdaLongGauge(() => 3L).getValue) + } + + test("TimestampGauge lifecycle") { + val clock = new ManualClock(1) + val ts = new TimestampGauge(clock) + assert (0L === ts.getValue) + ts.touch() + assert (1L === ts.getValue) + clock.setTime(1000) + assert (1L === ts.getValue) + ts.touch() + assert (1000 === ts.getValue) + } + + test("HistoryMetricSource registration and lookup") { + val threeGauge = new LambdaLongGauge(() => 3L) + val clock = new ManualClock(1) + val ts = new TimestampGauge(clock) + val timer = new Timer + val counter = new Counter() + val source = new TestMetricSource + source.register(Seq( + ("three", threeGauge), + ("timestamp", ts), + ("timer", timer), + ("counter", counter))) + logInfo(source.toString) + + val registry = source.metricRegistry + val counters = registry.getCounters.asScala + counters.size should be (1) + assert(counter === counters("t.counter")) + assert(counter === source.getCounter("counter").get) + + val gauges = registry.getGauges.asScala + gauges.size should be(2) + + assert(ts === source.getLongGauge("timestamp").get) + assert(threeGauge === source.getLongGauge("three").get) + assert(timer === source.getTimer("timer").get) + + } + + test("Handle failing Gauge.getValue in toString()") { + var zero = 0L + val trouble = new LambdaLongGauge(() => 1L/zero) + intercept[ArithmeticException](trouble.getValue) + val source = new TestMetricSource + source.register(Seq(("trouble", trouble))) + val s = source.toString + logInfo(s) + s should include("ArithmeticException") + } + + private class TestMetricSource extends HistoryMetricSource("t") { + override def sourceName: String = "TestMetricSource" + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 764156c3edc4..f702e9d76b55 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -26,7 +26,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpSe import scala.concurrent.duration._ import scala.language.postfixOps -import com.codahale.metrics.Counter +import com.codahale.metrics.{Counter, Gauge} import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.{FileUtils, IOUtils} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -43,6 +43,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.selenium.WebBrowser import org.apache.spark._ +import org.apache.spark.metrics.source.Source import org.apache.spark.ui.SparkUI import org.apache.spark.ui.jobs.UIData.JobUIData import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -419,15 +420,33 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers server.initialize() server.bind() val port = server.boundPort - val metrics = server.cacheMetrics + val cacheMetrics = server.cacheMetrics + val historyMetrics = server.historyMetrics + val providerMetrics = server.providerMetrics.get.asInstanceOf[HistoryMetricSource] + + // assert that a metric counter evaluates as expected; if not dump the whole metrics instance + def assertCounterEvaluates( + source: Source, + name: String, + counter: Counter, + evaluation: (Long => Boolean)): Unit = { + val value = counter.getCount + if (!evaluation(value)) { + // this is here because Scalatest loses stack depth + fail(s"Wrong $name value: $value in metrics\n$source") + } + } - // assert that a metric has a value; if not dump the whole metrics instance - def assertMetric(name: String, counter: Counter, expected: Long): Unit = { - val actual = counter.getCount - if (actual != expected) { + // assert that a long metric gauge evaluates as expected; if not dump the whole metrics instance + def assertGaugeEvaluates( + source: Source, + name: String, + gauge: Gauge[Long], + evaluation: (Long => Boolean)): Unit = { + val value = gauge.getValue + if (!evaluation(value)) { // this is here because Scalatest loses stack depth - fail(s"Wrong $name value - expected $expected but got $actual" + - s" in metrics\n$metrics") + fail(s"Wrong $name value: $value in metrics\n$source") } } @@ -441,7 +460,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix") } - val historyServerRoot = new URL(s"http://localhost:$port/") + // check that a dynamically calculated average returns 0 and not a division by zero error. + val replayTime = "appui.event.replay.time" + assertGaugeEvaluates(providerMetrics, replayTime, + providerMetrics.getLongGauge(replayTime).get, _ == 0) // start initial job val d = sc.parallelize(1 to 10) @@ -516,7 +538,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers getNumJobs("") should be (1) getNumJobs("/jobs") should be (1) getNumJobsRestful() should be (1) - assert(metrics.lookupCount.getCount > 1, s"lookup count too low in $metrics") + assertCounterEvaluates(cacheMetrics, "lookup count", + cacheMetrics.lookupCount, _ > 1) // dump state before the next bit of test, which is where update // checking really gets stressed @@ -564,6 +587,24 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers assert(jobcount === getNumJobs("/jobs")) + // print out the metrics. This forces a run through all the counters and gauges + // the evaluation is done outside the log statement to guarantee that the evaluation + // always takes place + val metricsDump = s"$historyMetrics\n$providerMetrics\n$cacheMetrics" + logInfo(s"Metrics:\n$metricsDump") + // make some assertions about internal state of providers via the metrics + val loadcount = "appui.load.count" + assert(metricsDump.contains(loadcount), + s"No $loadcount in metrics dump <$metricsDump>") + assertCounterEvaluates(providerMetrics, loadcount, + providerMetrics.getCounter(loadcount).get, _ > 0) + assertGaugeEvaluates(providerMetrics, replayTime, + providerMetrics.getLongGauge(replayTime).get, _ > 0) + + val evictionCount = cacheMetrics.fullname("eviction.count") + assert(metricsDump.contains(evictionCount), + s"No $evictionCount in metrics dump <$metricsDump>") + // no need to retain the test dir now the tests complete logDir.deleteOnExit();