Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetSocketAddress, ServerSocket}
import java.net.{InetAddress, InetSocketAddress, ServerSocket}
import java.util.concurrent.TimeUnit

import io.netty.bootstrap.ServerBootstrap
Expand Down Expand Up @@ -65,7 +65,7 @@ private[spark] class RBackend {
}
})

channelFuture = bootstrap.bind(new InetSocketAddress(0))
channelFuture = bootstrap.bind(new InetSocketAddress("localhost", 0))
channelFuture.syncUninterruptibly()
channelFuture.channel().localAddress().asInstanceOf[InetSocketAddress].getPort()
}
Expand Down Expand Up @@ -101,7 +101,7 @@ private[spark] object RBackend extends Logging {
try {
// bind to random port
val boundPort = sparkRBackend.init()
val serverSocket = new ServerSocket(0, 1)
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()

// tell the R process via temporary file
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.api.r

import java.io._
import java.net.ServerSocket
import java.net.{InetAddress, ServerSocket}
import java.util.{Map => JMap}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -55,7 +55,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
val parentIterator = firstParent[T].iterator(partition, context)

// we expect two connections
val serverSocket = new ServerSocket(0, 2)
val serverSocket = new ServerSocket(0, 2, InetAddress.getByName("localhost"))
val listenPort = serverSocket.getLocalPort()

// The stdout/stderr is shared by multiple tasks, because we use one daemon
Expand Down Expand Up @@ -414,7 +414,7 @@ private[r] object RRDD {
synchronized {
if (daemonChannel == null) {
// we expect one connections
val serverSocket = new ServerSocket(0, 1)
val serverSocket = new ServerSocket(0, 1, InetAddress.getByName("localhost"))
val daemonPort = serverSocket.getLocalPort
errThread = createRProcess(rLibDir, daemonPort, "daemon.R")
// the socket used to send out the input of task
Expand Down