Skip to content

Commit ff87a60

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-6890
2 parents 31d3ce8 + 51b306b commit ff87a60

File tree

34 files changed

+840
-418
lines changed

34 files changed

+840
-418
lines changed

R/pkg/R/RDD.R

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
8787
# This transformation is the first in its stage:
88-
.Object@func <- func
88+
.Object@func <- cleanClosure(func)
8989
.Object@prev_jrdd <- getJRDD(prev)
9090
.Object@env$prev_serializedMode <- prev@env$serializedMode
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9494
pipelinedFunc <- function(split, iterator) {
9595
func(split, prev@func(split, iterator))
9696
}
97-
.Object@func <- pipelinedFunc
97+
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
9999
# Get the serialization mode of the parent RDD
100100
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
144144
return(rdd@env$jrdd_val)
145145
}
146146

147-
computeFunc <- function(split, part) {
148-
rdd@func(split, part)
149-
}
150-
151147
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
152148
connection = NULL)
153149

154150
broadcastArr <- lapply(ls(.broadcastNames),
155151
function(name) { get(name, .broadcastNames) })
156152

157-
serializedFuncArr <- serialize(computeFunc, connection = NULL)
153+
serializedFuncArr <- serialize(rdd@func, connection = NULL)
158154

159155
prev_jrdd <- rdd@prev_jrdd
160156

@@ -551,11 +547,7 @@ setMethod("mapPartitions",
551547
setMethod("lapplyPartitionsWithIndex",
552548
signature(X = "RDD", FUN = "function"),
553549
function(X, FUN) {
554-
FUN <- cleanClosure(FUN)
555-
closureCapturingFunc <- function(split, part) {
556-
FUN(split, part)
557-
}
558-
PipelinedRDD(X, closureCapturingFunc)
550+
PipelinedRDD(X, FUN)
559551
})
560552

561553
#' @rdname lapplyPartitionsWithIndex

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,6 @@ setMethod("cogroup",
694694
for (i in 1:rddsLen) {
695695
rdds[[i]] <- lapply(rdds[[i]],
696696
function(x) { list(x[[1]], list(i, x[[2]])) })
697-
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
698-
# will not be captured into UDF if getJRDD is not invoked.
699-
# It should be resolved together with that issue.
700-
getJRDD(rdds[[i]]) # Capture the closure.
701697
}
702698
union.rdd <- Reduce(unionRDD, rdds)
703699
group.func <- function(vlist) {

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.util.StatCounter
3131
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
3232
/** Add up the elements in this RDD. */
3333
def sum(): Double = {
34-
self.reduce(_ + _)
34+
self.fold(0.0)(_ + _)
3535
}
3636

3737
/**

core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ import org.scalatest.FunSuite
2222
import org.apache.spark._
2323

2424
class DoubleRDDSuite extends FunSuite with SharedSparkContext {
25+
test("sum") {
26+
assert(sc.parallelize(Seq.empty[Double]).sum() === 0.0)
27+
assert(sc.parallelize(Seq(1.0)).sum() === 1.0)
28+
assert(sc.parallelize(Seq(1.0, 2.0)).sum() === 3.0)
29+
}
30+
2531
// Verify tests on the histogram functionality. We test with both evenly
2632
// and non-evenly spaced buckets as the bucket lookup function changes.
2733
test("WorksOnEmpty") {

dev/run-tests-jenkins

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ pr_message=""
161161
# Ensure we save off the current HEAD to revert to
162162
current_pr_head="`git rev-parse HEAD`"
163163

164+
echo "HEAD: `git rev-parse HEAD`"
165+
echo "GHPRB: $ghprbActualCommit"
166+
echo "SHA1: $sha1"
167+
164168
# Run pull request tests
165169
for t in "${PR_TESTS[@]}"; do
166170
this_test="${FWDIR}/dev/tests/${t}.sh"

dev/tests/pr_new_dependencies.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ CURR_CP_FILE="my-classpath.txt"
3939
MASTER_CP_FILE="master-classpath.txt"
4040

4141
# First switch over to the master branch
42-
git checkout master &>/dev/null
42+
git checkout -f master
4343
# Find and copy all pom.xml files into a *.gate file that we can check
4444
# against through various `git` changes
4545
find -name "pom.xml" -exec cp {} {}.gate \;
4646
# Switch back to the current PR
47-
git checkout "${current_pr_head}" &>/dev/null
47+
git checkout -f "${current_pr_head}"
4848

4949
# Check if any *.pom files from the current branch are different from the master
5050
difference_q=""
@@ -71,7 +71,7 @@ else
7171
sort > ${CURR_CP_FILE}
7272

7373
# Checkout the master branch to compare against
74-
git checkout master &>/dev/null
74+
git checkout -f master
7575

7676
${MVN_BIN} clean package dependency:build-classpath -DskipTests 2>/dev/null | \
7777
sed -n -e '/Building Spark Project Assembly/,$p' | \
@@ -84,7 +84,7 @@ else
8484
rev | \
8585
sort > ${MASTER_CP_FILE}
8686

87-
DIFF_RESULTS="`diff my-classpath.txt master-classpath.txt`"
87+
DIFF_RESULTS="`diff ${CURR_CP_FILE} ${MASTER_CP_FILE}`"
8888

8989
if [ -z "${DIFF_RESULTS}" ]; then
9090
echo " * This patch does not change any dependencies."

examples/src/main/java/org/apache/spark/examples/ml/JavaDeveloperApiExample.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class MyJavaLogisticRegression
116116
*/
117117
IntParam maxIter = new IntParam(this, "maxIter", "max number of iterations");
118118

119-
int getMaxIter() { return (Integer) get(maxIter); }
119+
int getMaxIter() { return (Integer) getOrDefault(maxIter); }
120120

121121
public MyJavaLogisticRegression() {
122122
setMaxIter(100);
@@ -211,7 +211,7 @@ public Vector predictRaw(Vector features) {
211211
public MyJavaLogisticRegressionModel copy() {
212212
MyJavaLogisticRegressionModel m =
213213
new MyJavaLogisticRegressionModel(parent_, fittingParamMap_, weights_);
214-
Params$.MODULE$.inheritValues(this.paramMap(), this, m);
214+
Params$.MODULE$.inheritValues(this.extractParamMap(), this, m);
215215
return m;
216216
}
217217
}

examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private trait MyLogisticRegressionParams extends ClassifierParams {
9999
* class since the maxIter parameter is only used during training (not in the Model).
100100
*/
101101
val maxIter: IntParam = new IntParam(this, "maxIter", "max number of iterations")
102-
def getMaxIter: Int = get(maxIter)
102+
def getMaxIter: Int = getOrDefault(maxIter)
103103
}
104104

105105
/**
@@ -174,11 +174,11 @@ private class MyLogisticRegressionModel(
174174
* Create a copy of the model.
175175
* The copy is shallow, except for the embedded paramMap, which gets a deep copy.
176176
*
177-
* This is used for the defaul implementation of [[transform()]].
177+
* This is used for the default implementation of [[transform()]].
178178
*/
179179
override protected def copy(): MyLogisticRegressionModel = {
180180
val m = new MyLogisticRegressionModel(parent, fittingParamMap, weights)
181-
Params.inheritValues(this.paramMap, this, m)
181+
Params.inheritValues(extractParamMap(), this, m)
182182
m
183183
}
184184
}

mllib/src/main/scala/org/apache/spark/ml/Estimator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
4040
*/
4141
@varargs
4242
def fit(dataset: DataFrame, paramPairs: ParamPair[_]*): M = {
43-
val map = new ParamMap().put(paramPairs: _*)
43+
val map = ParamMap(paramPairs: _*)
4444
fit(dataset, map)
4545
}
4646

mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class Pipeline extends Estimator[PipelineModel] {
8484
/** param for pipeline stages */
8585
val stages: Param[Array[PipelineStage]] = new Param(this, "stages", "stages of the pipeline")
8686
def setStages(value: Array[PipelineStage]): this.type = { set(stages, value); this }
87-
def getStages: Array[PipelineStage] = get(stages)
87+
def getStages: Array[PipelineStage] = getOrDefault(stages)
8888

8989
/**
9090
* Fits the pipeline to the input dataset with additional parameters. If a stage is an
@@ -101,7 +101,7 @@ class Pipeline extends Estimator[PipelineModel] {
101101
*/
102102
override def fit(dataset: DataFrame, paramMap: ParamMap): PipelineModel = {
103103
transformSchema(dataset.schema, paramMap, logging = true)
104-
val map = this.paramMap ++ paramMap
104+
val map = extractParamMap(paramMap)
105105
val theStages = map(stages)
106106
// Search for the last estimator.
107107
var indexOfLastEstimator = -1
@@ -138,7 +138,7 @@ class Pipeline extends Estimator[PipelineModel] {
138138
}
139139

140140
override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
141-
val map = this.paramMap ++ paramMap
141+
val map = extractParamMap(paramMap)
142142
val theStages = map(stages)
143143
require(theStages.toSet.size == theStages.size,
144144
"Cannot have duplicate components in a pipeline.")
@@ -177,14 +177,14 @@ class PipelineModel private[ml] (
177177

178178
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
179179
// Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
180-
val map = (fittingParamMap ++ this.paramMap) ++ paramMap
180+
val map = fittingParamMap ++ extractParamMap(paramMap)
181181
transformSchema(dataset.schema, map, logging = true)
182182
stages.foldLeft(dataset)((cur, transformer) => transformer.transform(cur, map))
183183
}
184184

185185
override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
186186
// Precedence of ParamMaps: paramMap > this.paramMap > fittingParamMap
187-
val map = (fittingParamMap ++ this.paramMap) ++ paramMap
187+
val map = fittingParamMap ++ extractParamMap(paramMap)
188188
stages.foldLeft(schema)((cur, transformer) => transformer.transformSchema(cur, map))
189189
}
190190
}

0 commit comments

Comments
 (0)