Skip to content

Commit 671bc08

Browse files
felixcheungFelix Cheung
authored andcommitted
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
## What changes were proposed in this pull request? Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #16739 from felixcheung/rcoalesce.
1 parent c97f4e1 commit 671bc08

File tree

11 files changed

+135
-18
lines changed

11 files changed

+135
-18
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ exportMethods("arrange",
8282
"as.data.frame",
8383
"attach",
8484
"cache",
85+
"coalesce",
8586
"collect",
8687
"colnames",
8788
"colnames<-",

R/pkg/R/DataFrame.R

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -678,14 +678,53 @@ setMethod("storageLevel",
678678
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
679679
})
680680

681+
#' Coalesce
682+
#'
683+
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
684+
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
685+
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
686+
#' the current partitions. If a larger number of partitions is requested, it will stay at the
687+
#' current number of partitions.
688+
#'
689+
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
690+
#' this may result in your computation taking place on fewer nodes than
691+
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
692+
#' call \code{repartition}. This will add a shuffle step, but means the
693+
#' current upstream partitions will be executed in parallel (per whatever
694+
#' the current partitioning is).
695+
#'
696+
#' @param numPartitions the number of partitions to use.
697+
#'
698+
#' @family SparkDataFrame functions
699+
#' @rdname coalesce
700+
#' @name coalesce
701+
#' @aliases coalesce,SparkDataFrame-method
702+
#' @seealso \link{repartition}
703+
#' @export
704+
#' @examples
705+
#'\dontrun{
706+
#' sparkR.session()
707+
#' path <- "path/to/file.json"
708+
#' df <- read.json(path)
709+
#' newDF <- coalesce(df, 1L)
710+
#'}
711+
#' @note coalesce(SparkDataFrame) since 2.1.1
712+
setMethod("coalesce",
713+
signature(x = "SparkDataFrame"),
714+
function(x, numPartitions) {
715+
stopifnot(is.numeric(numPartitions))
716+
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
717+
dataFrame(sdf)
718+
})
719+
681720
#' Repartition
682721
#'
683722
#' The following options for repartition are possible:
684723
#' \itemize{
685-
#' \item{1.} {Return a new SparkDataFrame partitioned by
724+
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
725+
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
686726
#' the given columns into \code{numPartitions}.}
687-
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
688-
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
727+
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
689728
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
690729
#'}
691730
#' @param x a SparkDataFrame.
@@ -697,6 +736,7 @@ setMethod("storageLevel",
697736
#' @rdname repartition
698737
#' @name repartition
699738
#' @aliases repartition,SparkDataFrame-method
739+
#' @seealso \link{coalesce}
700740
#' @export
701741
#' @examples
702742
#'\dontrun{

R/pkg/R/RDD.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
10281028
signature(x = "RDD"),
10291029
function(x, numPartitions) {
10301030
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031-
coalesce(x, numPartitions, TRUE)
1031+
coalesceRDD(x, numPartitions, TRUE)
10321032
} else {
10331033
stop("Please, specify the number of partitions")
10341034
}
@@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
10491049
#' @rdname coalesce
10501050
#' @aliases coalesce,RDD
10511051
#' @noRd
1052-
setMethod("coalesce",
1052+
setMethod("coalesceRDD",
10531053
signature(x = "RDD", numPartitions = "numeric"),
10541054
function(x, numPartitions, shuffle = FALSE) {
10551055
numPartitions <- numToInt(numPartitions)

R/pkg/R/functions.R

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,28 @@ setMethod("ceil",
286286
column(jc)
287287
})
288288

289+
#' Returns the first column that is not NA
290+
#'
291+
#' Returns the first column that is not NA, or NA if all inputs are.
292+
#'
293+
#' @rdname coalesce
294+
#' @name coalesce
295+
#' @family normal_funcs
296+
#' @export
297+
#' @aliases coalesce,Column-method
298+
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
299+
#' @note coalesce(Column) since 2.1.1
300+
setMethod("coalesce",
301+
signature(x = "Column"),
302+
function(x, ...) {
303+
jcols <- lapply(list(x, ...), function (x) {
304+
stopifnot(class(x) == "Column")
305+
x@jc
306+
})
307+
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
308+
column(jc)
309+
})
310+
289311
#' Though scala functions has "col" function, we don't expose it in SparkR
290312
#' because we don't want to conflict with the "col" function in the R base
291313
#' package and we also have "column" function exported which is an alias of "col".
@@ -297,15 +319,15 @@ col <- function(x) {
297319
#' Returns a Column based on the given column name
298320
#'
299321
#' Returns a Column based on the given column name.
300-
#
322+
#'
301323
#' @param x Character column name.
302324
#'
303325
#' @rdname column
304326
#' @name column
305327
#' @family normal_funcs
306328
#' @export
307329
#' @aliases column,character-method
308-
#' @examples \dontrun{column(df)}
330+
#' @examples \dontrun{column("name")}
309331
#' @note column since 1.6.0
310332
setMethod("column",
311333
signature(x = "character"),

R/pkg/R/generics.R

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
2828
# @rdname coalesce
2929
# @seealso repartition
3030
# @export
31-
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
31+
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })
3232

3333
# @rdname checkpoint-methods
3434
# @export
@@ -406,6 +406,13 @@ setGeneric("attach")
406406
#' @export
407407
setGeneric("cache", function(x) { standardGeneric("cache") })
408408

409+
#' @rdname coalesce
410+
#' @param x a Column or a SparkDataFrame.
411+
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
412+
#' provided.
413+
#' @export
414+
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })
415+
409416
#' @rdname collect
410417
#' @export
411418
setGeneric("collect", function(x, ...) { standardGeneric("collect") })

R/pkg/inst/tests/testthat/test_rdd.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
315315
expect_true(count >= 0 && count <= 4)
316316

317317
# coalesce
318-
r3 <- coalesce(rdd, 1)
318+
r3 <- coalesceRDD(rdd, 1)
319319
expect_equal(getNumPartitionsRDD(r3), 1L)
320320
count <- length(collectPartition(r3, 0L))
321321
expect_equal(count, 20)

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
725725
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
726726
df <- read.json(jsonPath)
727727
dfRDD <- toRDD(df)
728-
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
728+
saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
729729
objectIn <- objectFile(sc, objectPath)
730730

731731
expect_is(objectIn, "RDD")
@@ -1236,7 +1236,7 @@ test_that("column functions", {
12361236
c16 <- is.nan(c) + isnan(c) + isNaN(c)
12371237
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
12381238
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
1239-
c19 <- spark_partition_id()
1239+
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
12401240
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")
12411241

12421242
# Test if base::is.nan() is exposed
@@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
24912491
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
24922492

24932493
# repartition by column and number of partitions
2494-
actual <- repartition(df, 3L, col = df$"a")
2494+
actual <- repartition(df, 3, col = df$"a")
24952495

2496-
# since we cannot access the number of partitions from dataframe, checking
2497-
# that at least the dimensions are identical
2496+
# Checking that at least the dimensions are identical
24982497
expect_identical(dim(df), dim(actual))
2498+
expect_equal(getNumPartitions(actual), 3L)
24992499

25002500
# repartition by number of partitions
25012501
actual <- repartition(df, 13L)
25022502
expect_identical(dim(df), dim(actual))
2503+
expect_equal(getNumPartitions(actual), 13L)
2504+
2505+
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
25032506

25042507
# a test case with a column and dapply
25052508
schema <- structType(structField("a", "integer"), structField("avg", "double"))
@@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
25152518
expect_equal(nrow(df1), 2)
25162519
})
25172520

2521+
test_that("coalesce, repartition, numPartitions", {
2522+
df <- as.DataFrame(cars, numPartitions = 5)
2523+
expect_equal(getNumPartitions(df), 5)
2524+
expect_equal(getNumPartitions(coalesce(df, 3)), 3)
2525+
expect_equal(getNumPartitions(coalesce(df, 6)), 5)
2526+
2527+
df1 <- coalesce(df, 3)
2528+
expect_equal(getNumPartitions(df1), 3)
2529+
expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
2530+
expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
2531+
expect_equal(getNumPartitions(coalesce(df1, 2)), 2)
2532+
2533+
df2 <- repartition(df1, 10)
2534+
expect_equal(getNumPartitions(df2), 10)
2535+
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
2536+
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
2537+
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
2538+
})
2539+
25182540
test_that("gapply() and gapplyCollect() on a DataFrame", {
25192541
df <- createDataFrame (
25202542
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,8 @@ abstract class RDD[T: ClassTag](
423423
*
424424
* This results in a narrow dependency, e.g. if you go from 1000 partitions
425425
* to 100 partitions, there will not be a shuffle, instead each of the 100
426-
* new partitions will claim 10 of the current partitions.
426+
* new partitions will claim 10 of the current partitions. If a larger number
427+
* of partitions is requested, it will stay at the current number of partitions.
427428
*
428429
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
429430
* this may result in your computation taking place on fewer nodes than

python/pyspark/sql/dataframe.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,15 @@ def coalesce(self, numPartitions):
515515
Similar to coalesce defined on an :class:`RDD`, this operation results in a
516516
narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
517517
there will not be a shuffle, instead each of the 100 new partitions will
518-
claim 10 of the current partitions.
518+
claim 10 of the current partitions. If a larger number of partitions is requested,
519+
it will stay at the current number of partitions.
520+
521+
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
522+
this may result in your computation taking place on fewer nodes than
523+
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
524+
you can call repartition(). This will add a shuffle step, but means the
525+
current upstream partitions will be executed in parallel (per whatever
526+
the current partitioning is).
519527
520528
>>> df.coalesce(1).rdd.getNumPartitions()
521529
1

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2432,7 +2432,15 @@ class Dataset[T] private[sql](
24322432
* Returns a new Dataset that has exactly `numPartitions` partitions.
24332433
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
24342434
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
2435-
* the 100 new partitions will claim 10 of the current partitions.
2435+
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
2436+
* partitions is requested, it will stay at the current number of partitions.
2437+
*
2438+
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
2439+
* this may result in your computation taking place on fewer nodes than
2440+
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
2441+
* you can call repartition. This will add a shuffle step, but means the
2442+
* current upstream partitions will be executed in parallel (per whatever
2443+
* the current partitioning is).
24362444
*
24372445
* @group typedrel
24382446
* @since 1.6.0

0 commit comments

Comments
 (0)