Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private class CleanupTaskWeakReference(
* to be processed when the associated object goes out of scope of the application. Actual
* cleanup is performed in a separate daemon thread.
*/
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private[spark] class ContextCleaner(sc: SparkContext) extends Logging with Lifecycle {

private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference]
with SynchronizedBuffer[CleanupTaskWeakReference]
Expand Down Expand Up @@ -90,24 +90,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking.shuffle", false)

@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
def attachListener(listener: CleanerListener) {
listeners += listener
}

def conf = sc.conf

/** Start the cleaner. */
def start() {
override protected def doStart() {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
}

/** Stop the cleaner. */
def stop() {
stopped = true
}
override protected def doStop() { }

/** Register a RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]) {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ private[spark] class HttpServer(
securityManager: SecurityManager,
requestedPort: Int = 0,
serverName: String = "HTTP server")
extends Logging {
extends Logging with Lifecycle {

private var server: Server = null
private var port: Int = requestedPort

def start() {
def conf = securityManager.sparkConf

override protected def doStart() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
Expand Down Expand Up @@ -137,7 +139,7 @@ private[spark] class HttpServer(
sh
}

def stop() {
override protected def doStop() {
if (server == null) {
throw new ServerStateException("Server is already stopped")
} else {
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/scala/org/apache/spark/Lifecycle.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

trait Lifecycle extends Service {

import State._

protected var state_ = Uninitialized

def conf: SparkConf

def uninitialized = state_ == Uninitialized

def initialized = state_ == Initialized

def started = state_ == Started

def stopped = state_ == Stopped

def state: State.State = state_

def initialize(): Unit = synchronized {
if (!uninitialized) {
throw new SparkException(s"Can't move to initialized state when $state_")
}
doInitialize
state_ = Initialized
}

override def start(): Unit = synchronized {
if (uninitialized) initialize()
if (started) {
throw new SparkException(s"Can't move to started state when $state_")
}
doStart()
state_ = Started
}

override def stop(): Unit = synchronized {
if (!started) {
throw new SparkException(s"Can't move to stopped state when $state_")
}
doStop
state_ = Stopped
}

override def close(): Unit = synchronized {
stop()
}

protected def doInitialize(): Unit = {}

protected def doStart(): Unit

protected def doStop(): Unit
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* can take place.
*/

private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
private[spark] class SecurityManager(val sparkConf: SparkConf) extends Logging {

// key used to store the spark secret in the Hadoop UGI
private val sparkSecretLookupKey = "sparkCookie"
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/scala/org/apache/spark/Service.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

trait Service extends java.io.Closeable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Here has inherited Closeable class


/**
* Service states
*/
object State extends Enumeration {

/**
* Constructed but not initialized
*/
val Uninitialized = Value(0, "Uninitialized")

/**
* Initialized but not started or stopped
*/
val Initialized = Value(1, "Initialized")

/**
* started and not stopped
*/
val Started = Value(2, "Started")

/**
* stopped. No further state transitions are permitted
*/
val Stopped = Value(3, "Stopped")

type State = Value
}

def conf: SparkConf

def initialize(): Unit

def start(): Unit

def stop(): Unit

def state: State.State

}
22 changes: 18 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
* this config overrides the default configs as well as system properties.
*/

class SparkContext(config: SparkConf) extends Logging {
class SparkContext(config: SparkConf) extends Logging with Lifecycle {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
Expand Down Expand Up @@ -154,9 +154,8 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())

private[spark] val conf = config.clone()
val conf = config.clone()
conf.validateSettings()

/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
Expand Down Expand Up @@ -987,8 +986,23 @@ class SparkContext(config: SparkConf) extends Logging {
addedJars.clear()
}

override def start() {
if (stopped) {
throw new SparkException("SparkContext has already been stopped")
}
super.start()
}

override protected def doStart() {}

start()

/** Shut down the SparkContext. */
def stop() {
override def stop() {
if (started) super.stop()
}

override protected def doStop() {
postApplicationEnd()
ui.stop()
// Do this only if not stopped already - best case effort.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.{Lifecycle, Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
Expand All @@ -44,8 +44,8 @@ private[spark] class AppClient(
masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: AppClientListener,
conf: SparkConf)
extends Logging {
val conf: SparkConf)
extends Logging with Lifecycle {

val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
Expand Down Expand Up @@ -186,12 +186,12 @@ private[spark] class AppClient(

}

def start() {
override protected def doStart() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}

def stop() {
override protected def doStop() {
if (actor != null) {
try {
val timeout = AkkaUtils.askTimeout(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.util.SignalLogger
* EventLoggingListener.
*/
class HistoryServer(
conf: SparkConf,
val conf: SparkConf,
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
Expand Down Expand Up @@ -101,15 +101,14 @@ class HistoryServer(
}
}

initialize()

/**
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def initialize() {
override def doInitialize() {
attachPage(new HistoryPage(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

Expand All @@ -125,8 +124,8 @@ class HistoryServer(
}

/** Stop the server and close the file system. */
override def stop() {
super.stop()
override protected def doStop() {
super.doStop()
provider.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class MasterWebUI(val master: Master, requestedPort: Int)
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

initialize()
def conf = master.conf

/** Initialize all components of the server. */
def initialize() {
override def doInitialize() {
attachPage(new ApplicationPage(this))
attachPage(new HistoryNotFoundPage(this))
attachPage(new MasterPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ class WorkerWebUI(

val timeout = AkkaUtils.askTimeout(worker.conf)

initialize()
def conf = worker.conf

/** Initialize all components of the server. */
def initialize() {
override def doInitialize() {
val logPage = new LogPage(this)
attachPage(logPage)
attachPage(new WorkerPage(this))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR
.convertRatesTo(TimeUnit.SECONDS)
.build()

override def start() {
def conf = securityMgr.sparkConf

override protected def doStart() {
reporter.start(pollPeriod, pollUnit)
}

override def stop() {
override protected def doStop() {
reporter.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))

override def start() {
def conf = securityMgr.sparkConf

override protected def doStart() {
reporter.start(pollPeriod, pollUnit)
}

override def stop() {
override protected def doStop() {
reporter.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
.prefixedWith(prefix)
.build(graphite)

override def start() {
def conf = securityMgr.sparkConf

override protected def doStart() {
reporter.start(pollPeriod, pollUnit)
}

override def stop() {
override protected def doStop() {
reporter.stop()
}

Expand Down
Loading