Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"repartitionByRange",
"rollup",
"sample",
"sample_frac",
Expand Down
65 changes: 63 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ setMethod("storageLevel",
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @seealso \link{repartition}, \link{repartitionByRange}
#' @examples
#'\dontrun{
#' sparkR.session()
Expand Down Expand Up @@ -723,7 +723,7 @@ setMethod("coalesce",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @seealso \link{coalesce}, \link{repartitionByRange}
#' @examples
#'\dontrun{
#' sparkR.session()
Expand Down Expand Up @@ -759,6 +759,67 @@ setMethod("repartition",
dataFrame(sdf)
})


#' Repartition by range
#'
#' The following options for repartition by range are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame range partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#'
#' @param x a SparkDataFrame.
#' @param numPartitions the number of partitions to use.
#' @param col the column by which the range partitioning will be performed.
#' @param ... additional column(s) to be used in the range partitioning.
#'
#' @family SparkDataFrame functions
#' @rdname repartitionByRange
#' @name repartitionByRange
#' @aliases repartitionByRange,SparkDataFrame-method
#' @seealso \link{repartition}, \link{coalesce}
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- repartitionByRange(df, col = df$col1, df$col2)
#' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
#'}
#' @note repartitionByRange since 2.4.0
setMethod("repartitionByRange",
signature(x = "SparkDataFrame"),
function(x, numPartitions = NULL, col = NULL, ...) {
if (!is.null(numPartitions) && !is.null(col)) {
# number of partitions and columns both are specified
if (is.numeric(numPartitions) && class(col) == "Column") {
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
} else {
stop(paste("numPartitions and col must be numeric and Column; however, got",
class(numPartitions), "and", class(col)))
}
} else if (!is.null(col)) {
# only columns are specified
if (class(col) == "Column") {
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, some duplication but I think unavoidable for clarity

} else {
stop(paste("col must be Column; however, got", class(col)))
}
} else if (!is.null(numPartitions)) {
# only numPartitions is specified
stop("At least one partition-by column must be specified.")
} else {
stop("Please, specify a column(s) or the number of partitions with a column(s)")
}
dataFrame(sdf)
})

#' toJSON
#'
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
#' @rdname repartition
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })

#' @rdname repartitionByRange
setGeneric("repartitionByRange", function(x, ...) { standardGeneric("repartitionByRange") })

#' @rdname sample
setGeneric("sample",
function(x, withReplacement = FALSE, fraction, seed) {
Expand Down
45 changes: 45 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,51 @@ test_that("repartition by columns on DataFrame", {
})
})

test_that("repartitionByRange on a DataFrame", {
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
# partitions to reduce the number of the tasks to speed up the test. This is particularly
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
conf <- callJMethod(sparkSession, "conf")
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
tryCatch({
df <- createDataFrame(mtcars)
expect_error(repartitionByRange(df, "haha", df$mpg),
"numPartitions and col must be numeric and Column.*")
expect_error(repartitionByRange(df),
".*specify a column.*or the number of partitions with a column.*")
expect_error(repartitionByRange(df, col = "haha"),
"col must be Column; however, got.*")
expect_error(repartitionByRange(df, 3),
"At least one partition-by column must be specified.")

# The order of rows should be different with a normal repartition.
actual <- repartitionByRange(df, 3, df$mpg)
expect_equal(getNumPartitions(actual), 3)
expect_false(identical(collect(actual), collect(repartition(df, 3, df$mpg))))

actual <- repartitionByRange(df, col = df$mpg)
expect_false(identical(collect(actual), collect(repartition(df, col = df$mpg))))

# They should have same data.
actual <- collect(repartitionByRange(df, 3, df$mpg))
actual <- actual[order(actual$mpg), ]
expected <- collect(repartition(df, 3, df$mpg))
expected <- expected[order(expected$mpg), ]
expect_true(all(actual == expected))

actual <- collect(repartitionByRange(df, col = df$mpg))
actual <- actual[order(actual$mpg), ]
expected <- collect(repartition(df, col = df$mpg))
expected <- expected[order(expected$mpg), ]
expect_true(all(actual == expected))
},
finally = {
# Resetting the conf back to default value
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
})
})

test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
Expand Down