Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,7 @@ org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet
LZ4BlockInputStream.java
<<<<<<< HEAD
=======
spark-deps-.*
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
8 changes: 8 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ setMethod("%in%",

#' otherwise
#'
<<<<<<< HEAD
#' If values in the specified column are null, returns the value.
=======
#' If values in the specified column are null, returns the value.
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
#' Can be used in conjunction with `when` to specify a default value for expressions.
#'
#' @rdname otherwise
Expand All @@ -225,7 +229,11 @@ setMethod("%in%",
setMethod("otherwise",
signature(x = "Column", value = "ANY"),
function(x, value) {
<<<<<<< HEAD
value <- ifelse(class(value) == "Column", value@jc, value)
=======
value <- if (class(value) == "Column") { value@jc } else { value }
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
jc <- callJMethod(x@jc, "otherwise", value)
column(jc)
})
17 changes: 17 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ setMethod("lit", signature("ANY"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions",
"lit",
<<<<<<< HEAD
ifelse(class(x) == "Column", x@jc, x))
=======
if (class(x) == "Column") { x@jc } else { x })
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
column(jc)
})

Expand Down Expand Up @@ -2262,7 +2266,11 @@ setMethod("unix_timestamp", signature(x = "Column", format = "character"),
setMethod("when", signature(condition = "Column", value = "ANY"),
function(condition, value) {
condition <- condition@jc
<<<<<<< HEAD
value <- ifelse(class(value) == "Column", value@jc, value)
=======
value <- if (class(value) == "Column") { value@jc } else { value }
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
jc <- callJStatic("org.apache.spark.sql.functions", "when", condition, value)
column(jc)
})
Expand All @@ -2277,16 +2285,25 @@ setMethod("when", signature(condition = "Column", value = "ANY"),
#' @name ifelse
#' @seealso \link{when}
#' @export
<<<<<<< HEAD
#' @examples \dontrun{ifelse(df$a > 1 & df$b > 2, 0, 1)}
=======
#' @examples \dontrun{
#' ifelse(df$a > 1 & df$b > 2, 0, 1)
#' ifelse(df$a > 1, df$a, 1)
#' }
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
setMethod("ifelse",
signature(test = "Column", yes = "ANY", no = "ANY"),
function(test, yes, no) {
test <- test@jc
<<<<<<< HEAD
yes <- ifelse(class(yes) == "Column", yes@jc, yes)
no <- ifelse(class(no) == "Column", no@jc, no)
=======
yes <- if (class(yes) == "Column") { yes@jc } else { yes }
no <- if (class(no) == "Column") { no@jc } else { no }
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
jc <- callJMethod(callJStatic("org.apache.spark.sql.functions",
"when",
test, yes),
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

<<<<<<< HEAD
=======
test_that("calling sparkRSQL.init returns existing SQL context", {
expect_equal(sparkRSQL.init(sc), sqlContext)
})

>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
test_that("infer types and check types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
Expand Down Expand Up @@ -1124,6 +1127,8 @@ test_that("when(), otherwise() and ifelse() on a DataFrame", {
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, 0, 1)))[, 1], c(1, 0))
})

<<<<<<< HEAD
=======
test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
l <- list(list(a = 1, b = 2), list(a = 3, b = 4))
df <- createDataFrame(sqlContext, l)
Expand All @@ -1132,6 +1137,7 @@ test_that("when(), otherwise() and ifelse() with column on a DataFrame", {
expect_equal(collect(select(df, ifelse(df$a > 1 & df$b > 2, lit(0), lit(1))))[, 1], c(1, 0))
})

>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
test_that("group by, agg functions", {
df <- read.json(sqlContext, jsonPath)
df1 <- agg(df, name = "max", age = "sum")
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ a.expandbutton {
background-color: #49535a !important;
color: white;
cursor:pointer;
<<<<<<< HEAD
}
=======
}

.table-head-clickable th a, .table-head-clickable th a:hover {
Expand All @@ -235,3 +238,4 @@ a.expandbutton {
color: #333;
text-decoration: none;
}
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,12 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.kryoserializer.buffer.mb", "1.4",
"Please use spark.kryoserializer.buffer instead. The default value for " +
"spark.kryoserializer.buffer.mb was previously specified as '0.064'. Fractional values " +
<<<<<<< HEAD
"are no longer accepted. To specify the equivalent now, one may use '64k'.")
=======
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more.")
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
<<<<<<< HEAD
minPartitions).map(pair => pair._2.toString)
=======
minPartitions).map(pair => pair._2.toString).setName(path)
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
}

/**
Expand Down Expand Up @@ -874,18 +878,30 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
<<<<<<< HEAD
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
=======
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
<<<<<<< HEAD
minPartitions).setName(path).map(record => (record._1.toString, record._2.toString))
=======
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
}

/**
Expand Down Expand Up @@ -923,11 +939,19 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope {
assertNotStopped()
<<<<<<< HEAD
val job = new NewHadoopJob(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
=======
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
new BinaryFileRDD(
this,
classOf[StreamInputFormat],
Expand Down Expand Up @@ -1100,13 +1124,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
assertNotStopped()
<<<<<<< HEAD
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
=======
// The call to NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = NewHadoopJob.getInstance(conf)
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}

Expand Down Expand Up @@ -1369,7 +1403,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
}
<<<<<<< HEAD
val isDir = fs.getFileStatus(hadoopPath).isDir
=======
val isDir = fs.getFileStatus(hadoopPath).isDirectory
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
if (!isLocal && scheme == "file" && isDir) {
throw new SparkException(s"addFile does not support local directories when not running " +
"local mode.")
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemor
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
<<<<<<< HEAD
import org.apache.spark.rpc.akka.AkkaRpcEnv
=======
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -96,7 +100,13 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
<<<<<<< HEAD
if (!rpcEnv.isInstanceOf[AkkaRpcEnv]) {
actorSystem.shutdown()
}
=======
actorSystem.shutdown()
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
rpcEnv.shutdown()

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
Expand Down Expand Up @@ -245,11 +255,22 @@ object SparkEnv extends Logging {

val securityManager = new SecurityManager(conf)

<<<<<<< HEAD
// Create the ActorSystem for Akka and get the port it binds to.
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
val actorSystem: ActorSystem =
if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
} else {
=======
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
// Create the ActorSystem for Akka and get the port it binds to.
val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
val actorSystem: ActorSystem = {
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
val actorSystemPort =
if (port == 0 || rpcEnv.address == null) {
port
Expand Down
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import java.util.Date
import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
<<<<<<< HEAD
=======
import org.apache.hadoop.mapreduce.TaskType
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
Expand All @@ -38,7 +41,14 @@ import org.apache.spark.util.SerializableJobConf
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[spark]
<<<<<<< HEAD
class SparkHadoopWriter(jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
with Serializable {
=======
class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965

private val now = new Date()
private val conf = new SerializableJobConf(jobConf)
Expand Down Expand Up @@ -129,7 +139,11 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {

private def getJobContext(): JobContext = {
if (jobContext == null) {
<<<<<<< HEAD
jobContext = newJobContext(conf.value, jID.value)
=======
jobContext = new JobContextImpl(conf.value, jID.value)
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
}
jobContext
}
Expand All @@ -141,20 +155,27 @@ class SparkHadoopWriter(jobConf: JobConf) extends Logging with Serializable {
taskContext
}

<<<<<<< HEAD
=======
protected def newTaskAttemptContext(
conf: JobConf,
attemptId: TaskAttemptID): TaskAttemptContext = {
new TaskAttemptContextImpl(conf, attemptId)
}

>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
jobID = jobid
splitID = splitid
attemptID = attemptid

jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
<<<<<<< HEAD
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
=======
new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
}
}

Expand All @@ -172,9 +193,16 @@ object SparkHadoopWriter {
}
val outputPath = new Path(path)
val fs = outputPath.getFileSystem(conf)
<<<<<<< HEAD
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs)
=======
if (fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi

/**
<<<<<<< HEAD
* :: DeveloperApi ::
=======
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
<<<<<<< HEAD
@DeveloperApi
trait BroadcastFactory {
=======
private[spark] trait BroadcastFactory {
>>>>>>> 15bd73627e04591fd13667b4838c9098342db965

def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit

Expand Down
Loading