Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}

private[spark] class Master(
host: String,
Expand Down Expand Up @@ -755,12 +755,13 @@ private[spark] class Master(
}
}

private[spark] object Master {
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand Down Expand Up @@ -365,8 +365,9 @@ private[spark] class Worker(
}
}

private[spark] object Worker {
private[spark] object Worker extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
Expand Down Expand Up @@ -97,10 +97,12 @@ private[spark] class CoarseGrainedExecutorBackend(
}
}

private[spark] object CoarseGrainedExecutorBackend {
private[spark] object CoarseGrainedExecutorBackend extends Logging {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {

SignalLogger.register(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}

import org.apache.spark.{Logging, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.Utils
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.{SignalLogger, Utils}

private[spark] class MesosExecutorBackend
extends MesosExecutor
Expand Down Expand Up @@ -93,8 +93,9 @@ private[spark] class MesosExecutorBackend
/**
* Entry point for Mesos executor.
*/
private[spark] object MesosExecutorBackend {
private[spark] object MesosExecutorBackend extends Logging {
def main(args: Array[String]) {
SignalLogger.register(log)
SparkHadoopUtil.get.runAsSparkUser { () =>
MesosNativeLibrary.load()
// Create a new Executor and start it running
Expand Down
60 changes: 60 additions & 0 deletions core/src/main/scala/org/apache/spark/util/SignalLogger.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.commons.lang.SystemUtils
import org.slf4j.Logger
import sun.misc.{Signal, SignalHandler}

/**
* Used to log signals received. This can be very useful in debugging crashes or kills.
*
* Inspired by Colin Patrick McCabe's similar class from Hadoop.
*/
private[spark] object SignalLogger {

private var registered = false

/** Register a signal handler to log signals on UNIX-like systems. */
def register(log: Logger): Unit = synchronized {
if (SystemUtils.IS_OS_UNIX) {
require(!registered, "Can't re-install the signal handlers")
registered = true

val signals = Seq("TERM", "HUP", "INT")
for (signal <- signals) {
try {
new SignalLoggerHandler(signal, log)
} catch {
case e: Exception => log.warn("Failed to register signal handler " + signal, e)
}
}
log.info("Registered signal handlers for [" + signals.mkString(", ") + "]")
}
}
}

private sealed class SignalLoggerHandler(name: String, log: Logger) extends SignalHandler {

val prevHandler = Signal.handle(new Signal(name), this)

override def handle(signal: Signal): Unit = {
log.error("RECEIVED SIGNAL " + signal.getNumber() + ": SIG" + signal.getName())
prevHandler.handle(signal)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.{SignalLogger, Utils}

/**
* An application master that runs the users driver program and allocates executors.
Expand Down Expand Up @@ -409,7 +409,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

}

object ApplicationMaster {
object ApplicationMaster extends Logging {
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
Expand Down Expand Up @@ -487,6 +487,7 @@ object ApplicationMaster {
}

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.{SignalLogger, Utils}


/**
Expand Down Expand Up @@ -363,7 +363,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

}

object ApplicationMaster {
object ApplicationMaster extends Logging {
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
Expand Down Expand Up @@ -455,6 +455,7 @@ object ApplicationMaster {
}

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new ApplicationMasterArguments(argStrings)
SparkHadoopUtil.get.runAsSparkUser { () =>
new ApplicationMaster(args).run()
Expand Down