Skip to content

Commit 45c41aa

Browse files
dongjoon-hyunshivaram
authored andcommitted
[SPARK-16053][R] Add spark_partition_id in SparkR
## What changes were proposed in this pull request? This PR adds `spark_partition_id` virtual column function in SparkR for API parity. The following is just an example to illustrate a SparkR usage on a partitioned parquet table created by `spark.range(10).write.mode("overwrite").parquet("/tmp/t1")`. ```r > collect(select(read.parquet('/tmp/t1'), c('id', spark_partition_id()))) id SPARK_PARTITION_ID() 1 3 0 2 4 0 3 8 1 4 9 1 5 0 2 6 1 3 7 2 4 8 5 5 9 6 6 10 7 7 ``` ## How was this patch tested? Pass the Jenkins tests (including new testcase). Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13768 from dongjoon-hyun/SPARK-16053. (cherry picked from commit b0f2fb5) Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
1 parent dfa9202 commit 45c41aa

File tree

4 files changed

+27
-0
lines changed

4 files changed

+27
-0
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ exportMethods("%in%",
260260
"skewness",
261261
"sort_array",
262262
"soundex",
263+
"spark_partition_id",
263264
"stddev",
264265
"stddev_pop",
265266
"stddev_samp",

R/pkg/R/functions.R

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,27 @@ setMethod("soundex",
12061206
column(jc)
12071207
})
12081208

1209+
#' Return the partition ID as a column
1210+
#'
1211+
#' Return the partition ID of the Spark task as a SparkDataFrame column.
1212+
#' Note that this is nondeterministic because it depends on data partitioning and
1213+
#' task scheduling.
1214+
#'
1215+
#' This is equivalent to the SPARK_PARTITION_ID function in SQL.
1216+
#'
1217+
#' @rdname spark_partition_id
1218+
#' @name spark_partition_id
1219+
#' @export
1220+
#' @examples
1221+
#' \dontrun{select(df, spark_partition_id())}
1222+
#' @note spark_partition_id since 2.0.0
1223+
setMethod("spark_partition_id",
1224+
signature(x = "missing"),
1225+
function() {
1226+
jc <- callJStatic("org.apache.spark.sql.functions", "spark_partition_id")
1227+
column(jc)
1228+
})
1229+
12091230
#' @rdname sd
12101231
#' @name stddev
12111232
setMethod("stddev",

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,10 @@ setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array")
11351135
#' @export
11361136
setGeneric("soundex", function(x) { standardGeneric("soundex") })
11371137

1138+
#' @rdname spark_partition_id
1139+
#' @export
1140+
setGeneric("spark_partition_id", function(x) { standardGeneric("spark_partition_id") })
1141+
11381142
#' @rdname sd
11391143
#' @export
11401144
setGeneric("stddev", function(x) { standardGeneric("stddev") })

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,7 @@ test_that("column functions", {
10591059
c16 <- is.nan(c) + isnan(c) + isNaN(c)
10601060
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
10611061
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
1062+
c19 <- spark_partition_id()
10621063

10631064
# Test if base::is.nan() is exposed
10641065
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))

0 commit comments

Comments
 (0)