Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{SignalLogger, Utils}
import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}

private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
Expand All @@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
} onComplete {
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
}
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
}
}(ThreadUtils.sameThread)
}

def extractLogUrls: Map[String, String] = {
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.apache.spark.rdd

import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.util.ThreadUtils

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag

import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
Expand Down Expand Up @@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val f = new ComplexFutureAction[Seq[T]]

f.run {
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
// is a cached thread pool.
val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length
var partsScanned = 0
Expand Down Expand Up @@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
partsScanned += numPartsToTry
}
results.toSeq
}
}(AsyncRDDActions.futureExecutionContext)

f
}
Expand All @@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
(index, data) => Unit, Unit)
}
}

private object AsyncRDDActions {
val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
}
17 changes: 13 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
import scala.util.Random

Expand Down Expand Up @@ -77,6 +76,9 @@ private[spark] class BlockManager(

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

// Actual storage of where blocks are kept
private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
Expand Down Expand Up @@ -266,11 +268,13 @@ private[spark] class BlockManager(
asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
reregister()
asyncReregisterLock.synchronized {
asyncReregisterTask = null
}
}
}(futureExecutionContext)
}
}
}
Expand Down Expand Up @@ -744,7 +748,11 @@ private[spark] class BlockManager(
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future { replicate(blockId, bufferView, putLevel) }
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}

Expand Down Expand Up @@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@

package org.apache.spark.storage

import scala.collection.Iterable
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.RpcUtils
import org.apache.spark.util.{ThreadUtils, RpcUtils}

private[spark]
class BlockManagerMaster(
Expand Down Expand Up @@ -102,8 +103,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
}
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand All @@ -114,8 +115,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
}
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand All @@ -128,8 +129,8 @@ class BlockManagerMaster(
future.onFailure {
case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
}
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
}(ThreadUtils.sameThread)
if (blocking) {
Await.result(future, timeout)
}
Expand Down Expand Up @@ -169,11 +170,17 @@ class BlockManagerMaster(
val response = driverEndpoint.
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout)
if (result == null) {
implicit val sameThread = ThreadUtils.sameThread
val cbf =
implicitly[
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
val blockStatus = Await.result(
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
}
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) }
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,5 @@ case class BroadcastHashJoin(
object BroadcastHashJoin {

private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024))
ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch

import scala.collection.mutable.ArrayBuffer
import scala.concurrent._

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import java.util.concurrent.CountDownLatch
import scala.concurrent._
import ExecutionContext.Implicits.global
import org.apache.spark.util.ThreadUtils

/**
* Abstract class that is responsible for supervising a Receiver in the worker.
Expand All @@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
// Attach the executor to the receiver
receiver.attachExecutor(this)

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))

/** Receiver id */
protected val streamId = receiver.streamId

Expand Down Expand Up @@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
stoppingError = error.orNull
stopReceiver(message, error)
onStop(message, error)
futureExecutionContext.shutdownNow()
stopLatch.countDown()
}

Expand Down Expand Up @@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
Future {
// This is a blocking action so we should use "futureExecutionContext" which is a cached
// thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
Expand All @@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
logInfo("Starting receiver again")
startReceiver()
logInfo("Receiver started again")
}
}(futureExecutionContext)
}

/** Check if receiver has been marked for stopping */
Expand Down