Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,11 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
numUsableCores = numCores)
}

/**
Expand All @@ -184,7 +186,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
defaultActorSystem: ActorSystem = null): SparkEnv = {
defaultActorSystem: ActorSystem = null,
numUsableCores: Int = 0): SparkEnv = {

// Listener bus is only used on the driver
if (isDriver) {
Expand Down Expand Up @@ -276,7 +279,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf, securityManager)
new NettyBlockTransferService(conf, securityManager, numUsableCores)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
Expand All @@ -287,7 +290,8 @@ object SparkEnv extends Logging {

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()

private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
actorSystem)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
numCores: Int,
isLocal: Boolean = false,
actorSystem: ActorSystem = null)
extends Logging
Expand Down Expand Up @@ -83,7 +84,7 @@ private[spark] class Executor(
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
val _env = SparkEnv.createExecutorEnv(
conf, executorId, slaveHostname, port, isLocal, actorSystem)
conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.executor

import java.nio.ByteBuffer

import scala.collection.JavaConversions._

import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
Expand Down Expand Up @@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)

// Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
val cpusPerTask = executorInfo.getResourcesList
.find(_.getName == "cpus")
.map(_.getScalar.getValue.toInt)
.getOrElse(0)
val executorId = executorInfo.getExecutorId.getValue

logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
executorId,
slaveInfo.getHostname,
properties)
properties,
cpusPerTask)
}

override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
extends BlockTransferService {

// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(conf)
private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)

private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,22 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}

/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
*/
object SparkTransportConf {
def fromSparkConf(conf: SparkConf): TransportConf = {
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
*/
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
if (numUsableCores > 0) {
// Only set if serverThreads/clientThreads not already set.
conf.set("spark.shuffle.io.serverThreads",
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
conf.set("spark.shuffle.io.clientThreads",
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
}
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class LocalActor(
private val localExecutorHostname = "localhost"

val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)

override def receiveWithLogging = {
case ReviveOffers =>
Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager)
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging {

val diskBlockManager = new DiskBlockManager(this, conf)
Expand Down Expand Up @@ -121,8 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
securityManager.isAuthenticationEnabled())
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
Expand Down Expand Up @@ -174,9 +175,10 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager) = {
securityManager: SecurityManager,
numUsableCores: Int) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _

override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)

val securityManager0 = new SecurityManager(conf0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
exec0.init(blockManager)

val securityManager1 = new SecurityManager(conf1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
exec1.init(blockManager)

val result = fetchBlock(exec0, exec1, "1", blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr)
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
store.initialize("app-id")
allStores += store
store
Expand Down Expand Up @@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr)
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
manager
}
Expand Down Expand Up @@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
0)

// The put should fail since a1 is not serializable.
class UnserializableClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public TransportClient createClient(String remoteHost, int remotePort) throws IO
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());

// Use pooled buffers to reduce temporary buffer allocation
bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));

final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();

Expand Down Expand Up @@ -190,34 +191,4 @@ public void close() {
workerGroup = null;
}
}

/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
*/
private PooledByteBufAllocator createPooledByteBufAllocator() {
return new PooledByteBufAllocator(
conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
0, // tinyCacheSize
0, // smallCacheSize
0 // normalCacheSize
);
}

/** Used to get defaults from Netty's private static fields. */
private int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ private void init(int portToBind) {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;

PooledByteBufAllocator allocator = new PooledByteBufAllocator(
conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.network.util;

import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
Expand All @@ -32,6 +34,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.internal.PlatformDependent;

/**
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
Expand Down Expand Up @@ -103,4 +106,40 @@ public static String getRemoteAddress(Channel channel) {
}
return "<unknown remote>";
}

/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
boolean allowCache,
int numCores) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
);
}

/** Used to get defaults from Netty's private static fields. */
private static int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche

blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
new NioBlockTransferService(conf, securityMgr), securityMgr)
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")

tempDirectory = Files.createTempDir()
Expand Down