Skip to content

Commit afa1313

Browse files
committed
merge with master
2 parents 8559e4e + d7e43b6 commit afa1313

File tree

459 files changed

+15487
-6136
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

459 files changed

+15487
-6136
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ exportMethods("arrange",
8282
"as.data.frame",
8383
"attach",
8484
"cache",
85+
"coalesce",
8586
"collect",
8687
"colnames",
8788
"colnames<-",

R/pkg/R/DataFrame.R

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -678,14 +678,53 @@ setMethod("storageLevel",
678678
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
679679
})
680680

681+
#' Coalesce
682+
#'
683+
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
684+
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
685+
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
686+
#' the current partitions. If a larger number of partitions is requested, it will stay at the
687+
#' current number of partitions.
688+
#'
689+
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
690+
#' this may result in your computation taking place on fewer nodes than
691+
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
692+
#' call \code{repartition}. This will add a shuffle step, but means the
693+
#' current upstream partitions will be executed in parallel (per whatever
694+
#' the current partitioning is).
695+
#'
696+
#' @param numPartitions the number of partitions to use.
697+
#'
698+
#' @family SparkDataFrame functions
699+
#' @rdname coalesce
700+
#' @name coalesce
701+
#' @aliases coalesce,SparkDataFrame-method
702+
#' @seealso \link{repartition}
703+
#' @export
704+
#' @examples
705+
#'\dontrun{
706+
#' sparkR.session()
707+
#' path <- "path/to/file.json"
708+
#' df <- read.json(path)
709+
#' newDF <- coalesce(df, 1L)
710+
#'}
711+
#' @note coalesce(SparkDataFrame) since 2.1.1
712+
setMethod("coalesce",
713+
signature(x = "SparkDataFrame"),
714+
function(x, numPartitions) {
715+
stopifnot(is.numeric(numPartitions))
716+
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
717+
dataFrame(sdf)
718+
})
719+
681720
#' Repartition
682721
#'
683722
#' The following options for repartition are possible:
684723
#' \itemize{
685-
#' \item{1.} {Return a new SparkDataFrame partitioned by
724+
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
725+
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
686726
#' the given columns into \code{numPartitions}.}
687-
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
688-
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
727+
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
689728
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
690729
#'}
691730
#' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
697736
#' @rdname repartition
698737
#' @name repartition
699738
#' @aliases repartition,SparkDataFrame-method
739+
#' @seealso \link{coalesce}
700740
#' @export
701741
#' @examples
702742
#'\dontrun{
@@ -1764,6 +1804,10 @@ setClassUnion("numericOrcharacter", c("numeric", "character"))
17641804
#' @note [[ since 1.4.0
17651805
setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17661806
function(x, i) {
1807+
if (length(i) > 1) {
1808+
warning("Subset index has length > 1. Only the first index is used.")
1809+
i <- i[1]
1810+
}
17671811
if (is.numeric(i)) {
17681812
cols <- columns(x)
17691813
i <- cols[[i]]
@@ -1777,6 +1821,10 @@ setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17771821
#' @note [[<- since 2.1.1
17781822
setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
17791823
function(x, i, value) {
1824+
if (length(i) > 1) {
1825+
warning("Subset index has length > 1. Only the first index is used.")
1826+
i <- i[1]
1827+
}
17801828
if (is.numeric(i)) {
17811829
cols <- columns(x)
17821830
i <- cols[[i]]

R/pkg/R/RDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
10281028
signature(x = "RDD"),
10291029
function(x, numPartitions) {
10301030
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031-
coalesce(x, numPartitions, TRUE)
1031+
coalesceRDD(x, numPartitions, TRUE)
10321032
} else {
10331033
stop("Please, specify the number of partitions")
10341034
}
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
10491049
#' @rdname coalesce
10501050
#' @aliases coalesce,RDD
10511051
#' @noRd
1052-
setMethod("coalesce",
1052+
setMethod("coalesceRDD",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)

R/pkg/R/functions.R

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,28 @@ setMethod("ceil",
286286
column(jc)
287287
})
288288

289+
#' Returns the first column that is not NA
290+
#'
291+
#' Returns the first column that is not NA, or NA if all inputs are.
292+
#'
293+
#' @rdname coalesce
294+
#' @name coalesce
295+
#' @family normal_funcs
296+
#' @export
297+
#' @aliases coalesce,Column-method
298+
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
299+
#' @note coalesce(Column) since 2.1.1
300+
setMethod("coalesce",
301+
signature(x = "Column"),
302+
function(x, ...) {
303+
jcols <- lapply(list(x, ...), function (x) {
304+
stopifnot(class(x) == "Column")
305+
x@jc
306+
})
307+
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
308+
column(jc)
309+
})
310+
289311
#' Though scala functions has "col" function, we don't expose it in SparkR
290312
#' because we don't want to conflict with the "col" function in the R base
291313
#' package and we also have "column" function exported which is an alias of "col".
@@ -297,15 +319,15 @@ col <- function(x) {
297319
#' Returns a Column based on the given column name
298320
#'
299321
#' Returns a Column based on the given column name.
300-
#
322+
#'
301323
#' @param x Character column name.
302324
#'
303325
#' @rdname column
304326
#' @name column
305327
#' @family normal_funcs
306328
#' @export
307329
#' @aliases column,character-method
308-
#' @examples \dontrun{column(df)}
330+
#' @examples \dontrun{column("name")}
309331
#' @note column since 1.6.0
310332
setMethod("column",
311333
signature(x = "character"),

R/pkg/R/generics.R

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
2828
# @rdname coalesce
2929
# @seealso repartition
3030
# @export
31-
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
31+
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
3232

3333
# @rdname checkpoint-methods
3434
# @export
@@ -66,7 +66,7 @@ setGeneric("freqItems", function(x, cols, support = 0.01) { standardGeneric("fre
6666
# @rdname approxQuantile
6767
# @export
6868
setGeneric("approxQuantile",
69-
function(x, col, probabilities, relativeError) {
69+
function(x, cols, probabilities, relativeError) {
7070
standardGeneric("approxQuantile")
7171
})
7272

@@ -406,6 +406,13 @@ setGeneric("attach")
406406
#' @export
407407
setGeneric("cache", function(x) { standardGeneric("cache") })
408408

409+
#' @rdname coalesce
410+
#' @param x a Column or a SparkDataFrame.
411+
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
412+
#' provided.
413+
#' @export
414+
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
415+
409416
#' @rdname collect
410417
#' @export
411418
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
@@ -1399,7 +1406,7 @@ setGeneric("spark.randomForest",
13991406

14001407
#' @rdname spark.survreg
14011408
#' @export
1402-
setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") })
1409+
setGeneric("spark.survreg", function(data, formula, ...) { standardGeneric("spark.survreg") })
14031410

14041411
#' @rdname spark.svmLinear
14051412
#' @export

R/pkg/R/mllib_classification.R

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ function(object, path, overwrite = FALSE) {
207207
#' excepting that at most one value may be 0. The class with largest value p/t is predicted, where p
208208
#' is the original probability of that class and t is the class's threshold.
209209
#' @param weightCol The weight column name.
210+
#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
211+
#' or the number of partitions are large, this param could be adjusted to a larger size.
212+
#' This is an expert parameter. Default value should be good for most cases.
210213
#' @param ... additional arguments passed to the method.
211214
#' @return \code{spark.logit} returns a fitted logistic regression model.
212215
#' @rdname spark.logit
@@ -245,19 +248,21 @@ function(object, path, overwrite = FALSE) {
245248
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
246249
function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100,
247250
tol = 1E-6, family = "auto", standardization = TRUE,
248-
thresholds = 0.5, weightCol = NULL) {
251+
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2) {
249252
formula <- paste(deparse(formula), collapse = "")
250253

251-
if (is.null(weightCol)) {
252-
weightCol <- ""
254+
if (!is.null(weightCol) && weightCol == "") {
255+
weightCol <- NULL
256+
} else if (!is.null(weightCol)) {
257+
weightCol <- as.character(weightCol)
253258
}
254259

255260
jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
256261
data@sdf, formula, as.numeric(regParam),
257262
as.numeric(elasticNetParam), as.integer(maxIter),
258263
as.numeric(tol), as.character(family),
259264
as.logical(standardization), as.array(thresholds),
260-
as.character(weightCol))
265+
weightCol, as.integer(aggregationDepth))
261266
new("LogisticRegressionModel", jobj = jobj)
262267
})
263268

R/pkg/R/mllib_regression.R

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,16 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
102102
}
103103

104104
formula <- paste(deparse(formula), collapse = "")
105-
if (is.null(weightCol)) {
106-
weightCol <- ""
105+
if (!is.null(weightCol) && weightCol == "") {
106+
weightCol <- NULL
107+
} else if (!is.null(weightCol)) {
108+
weightCol <- as.character(weightCol)
107109
}
108110

109111
# For known families, Gamma is upper-cased
110112
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
111113
"fit", formula, data@sdf, tolower(family$family), family$link,
112-
tol, as.integer(maxIter), as.character(weightCol), regParam)
114+
tol, as.integer(maxIter), weightCol, regParam)
113115
new("GeneralizedLinearRegressionModel", jobj = jobj)
114116
})
115117

@@ -305,13 +307,15 @@ setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = "formula"
305307
function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol = NULL) {
306308
formula <- paste(deparse(formula), collapse = "")
307309

308-
if (is.null(weightCol)) {
309-
weightCol <- ""
310+
if (!is.null(weightCol) && weightCol == "") {
311+
weightCol <- NULL
312+
} else if (!is.null(weightCol)) {
313+
weightCol <- as.character(weightCol)
310314
}
311315

312316
jobj <- callJStatic("org.apache.spark.ml.r.IsotonicRegressionWrapper", "fit",
313317
data@sdf, formula, as.logical(isotonic), as.integer(featureIndex),
314-
as.character(weightCol))
318+
weightCol)
315319
new("IsotonicRegressionModel", jobj = jobj)
316320
})
317321

@@ -372,6 +376,10 @@ setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "char
372376
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
373377
#' operators are supported, including '~', ':', '+', and '-'.
374378
#' Note that operator '.' is not supported currently.
379+
#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
380+
#' or the number of partitions are large, this param could be adjusted to a larger size.
381+
#' This is an expert parameter. Default value should be good for most cases.
382+
#' @param ... additional arguments passed to the method.
375383
#' @return \code{spark.survreg} returns a fitted AFT survival regression model.
376384
#' @rdname spark.survreg
377385
#' @seealso survival: \url{https://cran.r-project.org/package=survival}
@@ -396,10 +404,10 @@ setMethod("write.ml", signature(object = "IsotonicRegressionModel", path = "char
396404
#' }
397405
#' @note spark.survreg since 2.0.0
398406
setMethod("spark.survreg", signature(data = "SparkDataFrame", formula = "formula"),
399-
function(data, formula) {
407+
function(data, formula, aggregationDepth = 2) {
400408
formula <- paste(deparse(formula), collapse = "")
401409
jobj <- callJStatic("org.apache.spark.ml.r.AFTSurvivalRegressionWrapper",
402-
"fit", formula, data@sdf)
410+
"fit", formula, data@sdf, as.integer(aggregationDepth))
403411
new("AFTSurvivalRegressionModel", jobj = jobj)
404412
})
405413

R/pkg/R/stats.R

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
138138
collect(dataFrame(sct))
139139
})
140140

141-
#' Calculates the approximate quantiles of a numerical column of a SparkDataFrame
141+
#' Calculates the approximate quantiles of numerical columns of a SparkDataFrame
142142
#'
143-
#' Calculates the approximate quantiles of a numerical column of a SparkDataFrame.
143+
#' Calculates the approximate quantiles of numerical columns of a SparkDataFrame.
144144
#' The result of this algorithm has the following deterministic bound:
145145
#' If the SparkDataFrame has N elements and if we request the quantile at probability p up to
146146
#' error err, then the algorithm will return a sample x from the SparkDataFrame so that the
@@ -149,15 +149,19 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
149149
#' This method implements a variation of the Greenwald-Khanna algorithm (with some speed
150150
#' optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
151151
#' Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
152+
#' Note that rows containing any NA values will be removed before calculation.
152153
#'
153154
#' @param x A SparkDataFrame.
154-
#' @param col The name of the numerical column.
155+
#' @param cols A single column name, or a list of names for multiple columns.
155156
#' @param probabilities A list of quantile probabilities. Each number must belong to [0, 1].
156157
#' For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
157158
#' @param relativeError The relative target precision to achieve (>= 0). If set to zero,
158159
#' the exact quantiles are computed, which could be very expensive.
159160
#' Note that values greater than 1 are accepted but give the same result as 1.
160-
#' @return The approximate quantiles at the given probabilities.
161+
#' @return The approximate quantiles at the given probabilities. If the input is a single column name,
162+
#' the output is a list of approximate quantiles in that column; If the input is
163+
#' multiple column names, the output should be a list, and each element in it is a list of
164+
#' numeric values which represents the approximate quantiles in corresponding column.
161165
#'
162166
#' @rdname approxQuantile
163167
#' @name approxQuantile
@@ -171,12 +175,17 @@ setMethod("freqItems", signature(x = "SparkDataFrame", cols = "character"),
171175
#' }
172176
#' @note approxQuantile since 2.0.0
173177
setMethod("approxQuantile",
174-
signature(x = "SparkDataFrame", col = "character",
178+
signature(x = "SparkDataFrame", cols = "character",
175179
probabilities = "numeric", relativeError = "numeric"),
176-
function(x, col, probabilities, relativeError) {
180+
function(x, cols, probabilities, relativeError) {
177181
statFunctions <- callJMethod(x@sdf, "stat")
178-
callJMethod(statFunctions, "approxQuantile", col,
179-
as.list(probabilities), relativeError)
182+
quantiles <- callJMethod(statFunctions, "approxQuantile", as.list(cols),
183+
as.list(probabilities), relativeError)
184+
if (length(cols) == 1) {
185+
quantiles[[1]]
186+
} else {
187+
quantiles
188+
}
180189
})
181190

182191
#' Returns a stratified sample without replacement

R/pkg/inst/tests/testthat/test_mllib_classification.R

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,15 @@ test_that("spark.logit", {
211211
df <- createDataFrame(data)
212212
model <- spark.logit(df, label ~ feature)
213213
prediction <- collect(select(predict(model, df), "prediction"))
214-
expect_equal(prediction$prediction, c("0.0", "0.0", "1.0", "1.0", "0.0"))
214+
expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))
215+
216+
# Test prediction with weightCol
217+
weight <- c(2.0, 2.0, 2.0, 1.0, 1.0)
218+
data2 <- as.data.frame(cbind(label, feature, weight))
219+
df2 <- createDataFrame(data2)
220+
model2 <- spark.logit(df2, label ~ feature, weightCol = "weight")
221+
prediction2 <- collect(select(predict(model2, df2), "prediction"))
222+
expect_equal(sort(prediction2$prediction), c("0.0", "0.0", "0.0", "0.0", "0.0"))
215223
})
216224

217225
test_that("spark.mlp", {

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
315315
expect_true(count >= 0 && count <= 4)
316316

317317
# coalesce
318-
r3 <- coalesce(rdd, 1)
318+
r3 <- coalesceRDD(rdd, 1)
319319
expect_equal(getNumPartitionsRDD(r3), 1L)
320320
count <- length(collectPartition(r3, 0L))
321321
expect_equal(count, 20)

0 commit comments

Comments
 (0)