diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 7745387dbceba..35985c20ee7c3 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -140,7 +140,8 @@ case class ExceptionFailure( e: Throwable, accumUpdates: Seq[AccumulableInfo], preserveCause: Boolean) { - this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), + this(e.getClass.getName, e.getMessage, e.getStackTrace, + if (Utils.dumpStackTrace(e)) Utils.exceptionString(e) else null, if (preserveCause) Some(new ThrowableSerializationWrapper(e)) else None, accumUpdates) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 85fda2a736d46..7782304c98e77 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -453,7 +453,14 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - logError(s"Exception in $taskName (TID $taskId)", t) + val errorMessage = s"Exception in $taskName (TID $taskId) (SW: t=${t.getClass})" + if (Utils.dumpStackTrace(t)) { + logError(errorMessage, t) + } else if (isTraceEnabled) { + logWarning(errorMessage, t) + } else { + logWarning(errorMessage + ": " + t.toString) + } // Collect latest accumulator values to report back to the driver val accums: Seq[AccumulatorV2[_, _]] = diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala index ab783554f0b2c..cf91fcf2faaf9 100644 --- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -56,7 +56,12 @@ private[memory] class ExecutionMemoryPool( private val memoryForTask = new mutable.HashMap[Long, Long]() override def memoryUsed: Long = lock.synchronized { - return memoryForTask.values.sum + var sum = 0L + val iter = memoryForTask.valuesIterator + while (iter.hasNext) { + sum += iter.next() + } + return sum } /** diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 4c6b639015a90..e9886386373c0 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -44,7 +44,7 @@ private[memory] class StorageMemoryPool( private[this] var _memoryUsed: Long = 0L override def memoryUsed: Long = lock.synchronized { - _memoryUsed + return _memoryUsed } private var _memoryStore: MemoryStore = _ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index b03cfe4f0dc49..eb30d4866a118 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -94,7 +94,7 @@ private[spark] class TaskSchedulerImpl( val nextTaskId = new AtomicLong(0) // IDs of the tasks running on each executor - private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] + protected val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size) @@ -169,10 +169,8 @@ private[spark] class TaskSchedulerImpl( waitBackendReady() } - override def submitTasks(taskSet: TaskSet) { - val tasks = taskSet.tasks - logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") - this.synchronized { + protected def getTaskSetManagerForSubmit(taskSet: TaskSet): TaskSetManager = { + { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = @@ -185,6 +183,15 @@ private[spark] class TaskSchedulerImpl( throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } + manager + } + } + + override def submitTasks(taskSet: TaskSet) { + val tasks = taskSet.tasks + logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") + this.synchronized { + val manager = getTaskSetManagerForSubmit(taskSet) schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { @@ -249,7 +256,7 @@ private[spark] class TaskSchedulerImpl( s" ${manager.parent.name}") } - private def resourceOfferSingleTaskSet( + protected def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], @@ -346,18 +353,30 @@ private[spark] class TaskSchedulerImpl( return tasks } + protected[scheduler] def getTaskSetManager(tid: Long): Option[TaskSetManager] = + taskIdToTaskSetManager.get(tid) + + protected def getExecutorAndManager(tid: Long): Option[(() => String, TaskSetManager)] = { + taskIdToTaskSetManager.get(tid) match { + case Some(taskSet) => + val getExecId = () => taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( + "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) + Some(getExecId -> taskSet) + case None => None + } + } + def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None var reason: Option[ExecutorLossReason] = None synchronized { try { - taskIdToTaskSetManager.get(tid) match { - case Some(taskSet) => + getExecutorAndManager(tid) match { + case Some((getExecId, taskSet)) => if (state == TaskState.LOST) { // TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode, // where each executor corresponds to a single task, so mark the executor as failed. - val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException( - "taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)")) + val execId = getExecId() if (executorIdToRunningTaskIds.contains(execId)) { reason = Some( SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) @@ -406,7 +425,7 @@ private[spark] class TaskSchedulerImpl( val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) - taskIdToTaskSetManager.get(id).map { taskSetMgr => + getTaskSetManager(id).map { taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.stageAttemptId, accInfos) } } @@ -534,7 +553,7 @@ private[spark] class TaskSchedulerImpl( /** * Cleans up the TaskScheduler's state for tracking the given task. */ - private def cleanupTaskState(tid: Long): Unit = { + protected def cleanupTaskState(tid: Long): Unit = { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { executorId => executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 26b4a40d05dd6..7707e446659a7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -267,7 +267,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def checkTaskSizeLimit(task: TaskDescription, taskSize: Int): Boolean = { if (taskSize > maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => + scheduler.getTaskSetManager(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 308bbbc734ab9..3edbdb2f02546 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2159,6 +2159,28 @@ private[spark] object Utils extends Logging { } } + @tailrec + private def getSQLException(t: Throwable): Option[java.sql.SQLException] = t match { + case null => None + case se: java.sql.SQLException => Some(se) + case _ => getSQLException(t.getCause) + } + + /** + * Return true if stack trace for the exception should be dumped and false otherwise. + */ + def dumpStackTrace(e: Throwable): Boolean = { + // skip stack traces for SQLExceptions for syntax error, constraint violation etc + getSQLException(e) match { + case Some(se) => + val state = se.getSQLState + !state.startsWith("23") && // constraint violations etc + !state.startsWith("42") && // syntax, auth errors + !state.startsWith("X0Z02") // conflict exception + case None => false + } + } + private implicit class Lock(lock: LockInfo) { def lockString: String = { lock match {