Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ object SparkEnv extends Logging {
UnifiedMemoryManager(conf, numUsableCores)
}

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores)

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
private[spark] class NettyBlockTransferService(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin is it a mistake that NettyBlockTransferService was public?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. Just found it's in network package

conf: SparkConf,
securityManager: SecurityManager,
override val hostName: String,
numCores: Int)
extends BlockTransferService {

// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
Expand All @@ -65,13 +69,13 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
server = createServer(serverBootstrap.toList)
appId = conf.getAppId
logInfo("Server created on " + server.getPort)
logInfo(s"Server created on ${hostName}:${server.getPort}")
}

/** Creates and binds the TransportServer, possibly trying multiple ports. */
private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
def startService(port: Int): (TransportServer, Int) = {
val server = transportContext.createServer(port, bootstraps.asJava)
val server = transportContext.createServer(hostName, port, bootstraps.asJava)
(server, server.getPort)
}

Expand Down Expand Up @@ -109,8 +113,6 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
}

override def hostName: String = Utils.localHostName()

override def port: Int = server.getPort

override def uploadBlock(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)

val securityManager0 = new SecurityManager(conf0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", numCores = 1)
exec0.init(blockManager)

val securityManager1 = new SecurityManager(conf1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", numCores = 1)
exec1.init(blockManager)

val result = fetchBlock(exec0, exec1, "1", blockId) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class NettyBlockTransferServiceSuite
.set("spark.blockManager.port", port.toString)
val securityManager = new SecurityManager(conf)
val blockDataManager = mock(classOf[BlockDataManager])
val service = new NettyBlockTransferService(conf, securityManager, numCores = 1)
val service = new NettyBlockTransferService(conf, securityManager, "localhost", numCores = 1)
service.init(blockDataManager)
service
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
conf.set("spark.testing.memory", maxMem.toString)
conf.set("spark.memory.offHeap.size", maxMem.toString)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.memory.offHeap.size", maxMem.toString)
val serializer = new KryoSerializer(conf)
val transfer = transferService
.getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
.getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1))
val memManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(serializer, conf)
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
Expand Down Expand Up @@ -490,7 +490,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockManager = makeBlockManager(128, "exec", bmMaster)
val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
assert(locations.map(_.host).toSet === Set(localHost, localHost, otherHost))
}

test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
Expand Down Expand Up @@ -852,7 +852,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
conf.set("spark.testing.memory", "1200")
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ object MimaExcludes {
) ++ Seq(
// [SPARK-13430][ML] moved featureCol from LinearRegressionModelSummary to LinearRegressionSummary
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.regression.LinearRegressionSummary.this")
) ++ Seq(
// [SPARK-14437][Core] Use the address that NettyBlockTransferService listens to create BlockManagerId
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockTransferService.this")
) ++ Seq(
// [SPARK-13048][ML][MLLIB] keepLastCheckpoint option for LDA EM optimizer
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class ReceivedBlockHandlerSuite
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", numCores = 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
memManager.setMemoryStore(blockManager.memoryStore)
Expand Down