From 9c1439eaf3a459b799a043c424dd9fa044c99ad0 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 15:03:33 +0900 Subject: [PATCH 01/12] Support other types for options --- R/pkg/R/DataFrame.R | 4 ++-- R/pkg/R/SQLContext.R | 4 ++-- R/pkg/R/utils.R | 16 ++++++++++++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 17 +++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 75861d5de709..21a1481638a1 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2638,7 +2638,7 @@ setMethod("write.df", source <- getDefaultSqlSource() } jmode <- convertToJSaveMode(mode) - options <- varargsToEnv(...) + options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path } @@ -2701,7 +2701,7 @@ setMethod("saveAsTable", source <- getDefaultSqlSource() } jmode <- convertToJSaveMode(mode) - options <- varargsToEnv(...) + options <- varargsToStrEnv(...) write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index baa87824beb9..e5173605abab 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -779,7 +779,7 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string "in 'spark.sql.sources.default' configuration by default.") } sparkSession <- getSparkSession() - options <- varargsToEnv(...) + options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path } @@ -842,7 +842,7 @@ loadDF <- function(x = NULL, ...) { #' @note createExternalTable since 1.4.0 createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) { sparkSession <- getSparkSession() - options <- varargsToEnv(...) + options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path } diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index e69666453480..adaf75aa2dac 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -334,6 +334,22 @@ varargsToEnv <- function(...) { env } +# Utility function to capture the varargs into environment object but all values are converted +# into string. +varargsToStrEnv <- function(...) { + env <- varargsToEnv(...) + for (name in names(env)) { + if (is.logical(env[[name]])) { + env[[name]] <- tolower(as.character(env[[name]])) + } else if (is.null(env[[name]])) { + env[[name]] <- env[[name]] + } else { + env[[name]] <- as.character(env[[name]]) + } + } + env +} + getStorageLevel <- function(newLevel = c("DISK_ONLY", "DISK_ONLY_2", "MEMORY_AND_DISK", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index f5ab601f274f..bfc7f1cb0321 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -256,6 +256,23 @@ test_that("read/write csv as DataFrame", { unlink(csvPath2) }) +test_that("Support other types for options", { + csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv") + mockLinesCsv <- c("year,make,model,comment,blank", + "\"2012\",\"Tesla\",\"S\",\"No comment\",", + "1997,Ford,E350,\"Go get one now they are going fast\",", + "2015,Chevy,Volt", + "NA,Dummy,Placeholder") + writeLines(mockLinesCsv, csvPath) + + csvDf <- read.df(csvPath, "csv", header = "true", inferSchema = "true") + expected <- read.df(csvPath, "csv", header = TRUE, inferSchema = TRUE) + expect_equal(collect(csvDf), collect(expected)) + + expect_error(read.df(csvPath, "csv", header = TRUE, maxColumns = 3)) + unlink(csvPath) +}) + test_that("convert NAs to null type in DataFrames", { rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L))) df <- createDataFrame(rdd, list("a", "b")) From 837e321e7a3607e8a7355b51c14627dd6c1b239b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 16:23:40 +0900 Subject: [PATCH 02/12] Add mode and options for read/write.parquet read/write.json read/write.orc read/write.text --- R/pkg/R/DataFrame.R | 24 ++++++++-- R/pkg/R/SQLContext.R | 16 +++++-- R/pkg/R/generics.R | 10 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 58 +++++++++++++++++++++++ 4 files changed, 96 insertions(+), 12 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 21a1481638a1..45bf581a62f3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -743,8 +743,12 @@ setMethod("toJSON", #' @note write.json since 1.6.0 setMethod("write.json", signature(x = "SparkDataFrame", path = "character"), - function(x, path) { + function(x, path, mode = "error", ...) { + options <- varargsToStrEnv(...) + jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") + write <- callJMethod(write, "options", options) + write <- callJMethod(write, "mode", jmode) invisible(callJMethod(write, "json", path)) }) @@ -771,8 +775,12 @@ setMethod("write.json", #' @note write.orc since 2.0.0 setMethod("write.orc", signature(x = "SparkDataFrame", path = "character"), - function(x, path) { + function(x, path, mode = "error", ...) { + options <- varargsToStrEnv(...) + jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") + write <- callJMethod(write, "options", options) + write <- callJMethod(write, "mode", jmode) invisible(callJMethod(write, "orc", path)) }) @@ -800,8 +808,12 @@ setMethod("write.orc", #' @note write.parquet since 1.6.0 setMethod("write.parquet", signature(x = "SparkDataFrame", path = "character"), - function(x, path) { + function(x, path, mode = "error", ...) { + options <- varargsToStrEnv(...) + jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") + write <- callJMethod(write, "options", options) + write <- callJMethod(write, "mode", jmode) invisible(callJMethod(write, "parquet", path)) }) @@ -841,8 +853,12 @@ setMethod("saveAsParquetFile", #' @note write.text since 2.0.0 setMethod("write.text", signature(x = "SparkDataFrame", path = "character"), - function(x, path) { + function(x, path, mode = "error", ...) { + options <- varargsToStrEnv(...) + jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") + write <- callJMethod(write, "options", options) + write <- callJMethod(write, "mode", jmode) invisible(callJMethod(write, "text", path)) }) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index e5173605abab..0fff9a79fe2f 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -341,11 +341,13 @@ setMethod("toDF", signature(x = "RDD"), #' @name read.json #' @method read.json default #' @note read.json since 1.6.0 -read.json.default <- function(path) { +read.json.default <- function(path, ...) { sparkSession <- getSparkSession() + options <- varargsToStrEnv(...) # Allow the user to have a more flexible definiton of the text file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sparkSession, "read") + read <- callJMethod(read, "options", options) sdf <- callJMethod(read, "json", paths) dataFrame(sdf) } @@ -410,11 +412,13 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { #' @export #' @name read.orc #' @note read.orc since 2.0.0 -read.orc <- function(path) { +read.orc <- function(path, ...) { sparkSession <- getSparkSession() + options <- varargsToStrEnv(...) # Allow the user to have a more flexible definiton of the ORC file path path <- suppressWarnings(normalizePath(path)) read <- callJMethod(sparkSession, "read") + read <- callJMethod(read, "options", options) sdf <- callJMethod(read, "orc", path) dataFrame(sdf) } @@ -430,11 +434,13 @@ read.orc <- function(path) { #' @name read.parquet #' @method read.parquet default #' @note read.parquet since 1.6.0 -read.parquet.default <- function(path) { +read.parquet.default <- function(path, ...) { + options <- varargsToStrEnv(...) sparkSession <- getSparkSession() # Allow the user to have a more flexible definiton of the Parquet file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sparkSession, "read") + read <- callJMethod(read, "options", options) sdf <- callJMethod(read, "parquet", paths) dataFrame(sdf) } @@ -479,11 +485,13 @@ parquetFile <- function(x, ...) { #' @name read.text #' @method read.text default #' @note read.text since 1.6.1 -read.text.default <- function(path) { +read.text.default <- function(path, ...) { sparkSession <- getSparkSession() + options <- varargsToStrEnv(...) # Allow the user to have a more flexible definiton of the text file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sparkSession, "read") + read <- callJMethod(read, "options", options) sdf <- callJMethod(read, "text", paths) dataFrame(sdf) } diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 90a02e277831..6d84ecb5b9a1 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -651,15 +651,17 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) { #' @rdname write.json #' @export -setGeneric("write.json", function(x, path) { standardGeneric("write.json") }) +setGeneric("write.json", function(x, path, mode = NULL, ...) { standardGeneric("write.json") }) #' @rdname write.orc #' @export -setGeneric("write.orc", function(x, path) { standardGeneric("write.orc") }) +setGeneric("write.orc", function(x, path, mode = NULL, ...) { standardGeneric("write.orc") }) #' @rdname write.parquet #' @export -setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") }) +setGeneric("write.parquet", function(x, path, mode = NULL, ...) { + standardGeneric("write.parquet") +}) #' @rdname write.parquet #' @export @@ -667,7 +669,7 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu #' @rdname write.text #' @export -setGeneric("write.text", function(x, path) { standardGeneric("write.text") }) +setGeneric("write.text", function(x, path, mode = NULL, ...) { standardGeneric("write.text") }) #' @rdname schema #' @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index bfc7f1cb0321..b67d71bca639 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -514,6 +514,19 @@ test_that("read/write json files", { unlink(jsonPath3) }) +test_that("read/write json files - compression option", { + df <- read.df(jsonPath, "json") + + jsonPath <- tempfile(pattern = "jsonPath", fileext = ".json") + write.json(df, jsonPath, compression = "gzip") + jsonDF <- read.json(jsonPath) + expect_is(jsonDF, "SparkDataFrame") + expect_equal(count(jsonDF), count(df)) + expect_true(length(list.files(jsonPath, pattern = ".gz")) > 0) + + unlink(jsonPath) +}) + test_that("jsonRDD() on a RDD with json string", { sqlContext <- suppressWarnings(sparkRSQL.init(sc)) rdd <- parallelize(sc, mockLines) @@ -1803,6 +1816,21 @@ test_that("read/write ORC files", { unsetHiveContext() }) +test_that("read/write ORC files - compression option", { + setHiveContext(sc) + df <- read.df(jsonPath, "json") + + orcPath2 <- tempfile(pattern = "orcPath2", fileext = ".orc") + write.orc(df, orcPath2, compression = "ZLIB") + orcDF <- read.orc(orcPath2) + expect_is(orcDF, "SparkDataFrame") + expect_equal(count(orcDF), count(df)) + expect_true(length(list.files(orcPath, pattern = ".zlib.orc")) > 0) + + unlink(orcPath2) + unsetHiveContext() +}) + test_that("read/write Parquet files", { df <- read.df(jsonPath, "json") # Test write.df and read.df @@ -1834,6 +1862,23 @@ test_that("read/write Parquet files", { unlink(parquetPath4) }) +test_that("read/write Parquet files - compression option/mode", { + df <- read.df(jsonPath, "json") + tempPath <- tempfile(pattern = "tempPath", fileext = ".parquet") + + # Test write.df and read.df + write.parquet(df, tempPath, compression = "GZIP") + df2 <- read.parquet(tempPath) + expect_is(df2, "SparkDataFrame") + expect_equal(count(df2), 3) + expect_true(length(list.files(tempPath, pattern = ".gz.parquet")) > 0) + + write.parquet(df, tempPath, mode = "overwrite") + df3 <- read.parquet(tempPath) + expect_is(df3, "SparkDataFrame") + expect_equal(count(df3), 3) +}) + test_that("read/write text files", { # Test write.df and read.df df <- read.df(jsonPath, "text") @@ -1855,6 +1900,19 @@ test_that("read/write text files", { unlink(textPath2) }) +test_that("read/write text files - compression option", { + df <- read.df(jsonPath, "text") + + textPath <- tempfile(pattern = "textPath", fileext = ".txt") + write.text(df, textPath, compression = "GZIP") + textDF <- read.text(textPath) + expect_is(textDF, "SparkDataFrame") + expect_equal(count(textDF), count(df)) + expect_true(length(list.files(textPath, pattern = ".gz")) > 0) + + unlink(textPath) +}) + test_that("describe() and summarize() on a DataFrame", { df <- read.json(jsonPath) stats <- describe(df, "age") From 08b8795c4d3f23bae4a60e32b61989ccc237670c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 16:31:21 +0900 Subject: [PATCH 03/12] Fix indentation --- R/pkg/R/utils.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index adaf75aa2dac..cfd430fa9929 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -340,11 +340,11 @@ varargsToStrEnv <- function(...) { env <- varargsToEnv(...) for (name in names(env)) { if (is.logical(env[[name]])) { - env[[name]] <- tolower(as.character(env[[name]])) + env[[name]] <- tolower(as.character(env[[name]])) } else if (is.null(env[[name]])) { - env[[name]] <- env[[name]] + env[[name]] <- env[[name]] } else { - env[[name]] <- as.character(env[[name]]) + env[[name]] <- as.character(env[[name]]) } } env From c8baba5e8ffbbcdf33eb5546be8ee8f86a5cd206 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 17:05:40 +0900 Subject: [PATCH 04/12] Fix utils and add the tests for utils --- R/pkg/R/utils.R | 14 +++++++++----- R/pkg/inst/tests/testthat/test_utils.R | 7 +++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index cfd430fa9929..2463aa4bf6ac 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -339,12 +339,16 @@ varargsToEnv <- function(...) { varargsToStrEnv <- function(...) { env <- varargsToEnv(...) for (name in names(env)) { - if (is.logical(env[[name]])) { - env[[name]] <- tolower(as.character(env[[name]])) - } else if (is.null(env[[name]])) { - env[[name]] <- env[[name]] + value <- env[[name]] + if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) { + stop("value[", value, "] in key[", name, "] is not convertable to string.") + } + if (is.logical(value)) { + env[[name]] <- tolower(as.character(value)) + } else if (is.null(value)) { + env[[name]] <- value } else { - env[[name]] <- as.character(env[[name]]) + env[[name]] <- as.character(value) } } env diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 69ed5549168b..87dced471ffe 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -217,4 +217,11 @@ test_that("rbindRaws", { }) +test_that("varargsToStrEnv", { + strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd") + env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd") + expect_equal(strenv, env) + expect_error(varargsToStrEnv(a = list(1, "a"))) +}) + sparkR.session.stop() From 1ac3c7b2f6580a332b71326792bc54610f5e8b98 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 20:11:53 +0900 Subject: [PATCH 05/12] Fix orc tests --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index b67d71bca639..6d8cfad5c1f9 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1825,7 +1825,7 @@ test_that("read/write ORC files - compression option", { orcDF <- read.orc(orcPath2) expect_is(orcDF, "SparkDataFrame") expect_equal(count(orcDF), count(df)) - expect_true(length(list.files(orcPath, pattern = ".zlib.orc")) > 0) + expect_true(length(list.files(orcPath2, pattern = ".zlib.orc")) > 0) unlink(orcPath2) unsetHiveContext() From 28df54b68a3b5f25190d9577cfc358e1d548aba8 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 26 Sep 2016 21:31:22 +0900 Subject: [PATCH 06/12] Do not reuse env objects --- R/pkg/R/utils.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 2463aa4bf6ac..ce391101cf18 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -337,9 +337,10 @@ varargsToEnv <- function(...) { # Utility function to capture the varargs into environment object but all values are converted # into string. varargsToStrEnv <- function(...) { - env <- varargsToEnv(...) - for (name in names(env)) { - value <- env[[name]] + pairs <- list(...) + env <- new.env() + for (name in names(pairs)) { + value <- pairs[[name]] if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) { stop("value[", value, "] in key[", name, "] is not convertable to string.") } From 20cda71ff5c31d327552bfab2d8c60aa6296cdb1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Sep 2016 22:28:08 +0900 Subject: [PATCH 07/12] Address comments --- R/pkg/R/DataFrame.R | 46 +++++++++++++++----------- R/pkg/R/SQLContext.R | 4 +++ R/pkg/R/utils.R | 3 +- R/pkg/inst/tests/testthat/test_utils.R | 4 ++- 4 files changed, 35 insertions(+), 22 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 45bf581a62f3..8196bd554b3f 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -55,6 +55,19 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { .Object }) +#' Set options/mode and then return the write object +#' @noRd +setWriteOptions <- function(write, path = NULL, mode = 'error', ...) { + options <- varargsToStrEnv(...) + if (!is.null(path)) { + options[["path"]] <- path + } + jmode <- convertToJSaveMode(mode) + write <- callJMethod(write, "mode", jmode) + write <- callJMethod(write, "options", options) + write +} + #' @export #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the SparkDataFrame is cached @@ -727,6 +740,8 @@ setMethod("toJSON", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved +#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions #' @rdname write.json @@ -744,11 +759,8 @@ setMethod("toJSON", setMethod("write.json", signature(x = "SparkDataFrame", path = "character"), function(x, path, mode = "error", ...) { - options <- varargsToStrEnv(...) - jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "options", options) - write <- callJMethod(write, "mode", jmode) + write <- setWriteOptions(write, mode = mode, ...) invisible(callJMethod(write, "json", path)) }) @@ -759,6 +771,8 @@ setMethod("write.json", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved +#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions #' @aliases write.orc,SparkDataFrame,character-method @@ -776,11 +790,8 @@ setMethod("write.json", setMethod("write.orc", signature(x = "SparkDataFrame", path = "character"), function(x, path, mode = "error", ...) { - options <- varargsToStrEnv(...) - jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "options", options) - write <- callJMethod(write, "mode", jmode) + write <- setWriteOptions(write, mode = mode, ...) invisible(callJMethod(write, "orc", path)) }) @@ -791,6 +802,8 @@ setMethod("write.orc", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved +#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions #' @rdname write.parquet @@ -809,11 +822,8 @@ setMethod("write.orc", setMethod("write.parquet", signature(x = "SparkDataFrame", path = "character"), function(x, path, mode = "error", ...) { - options <- varargsToStrEnv(...) - jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "options", options) - write <- callJMethod(write, "mode", jmode) + write <- setWriteOptions(write, mode = mode, ...) invisible(callJMethod(write, "parquet", path)) }) @@ -837,6 +847,8 @@ setMethod("saveAsParquetFile", #' #' @param x A SparkDataFrame #' @param path The directory where the file is saved +#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default) +#' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions #' @aliases write.text,SparkDataFrame,character-method @@ -854,11 +866,8 @@ setMethod("saveAsParquetFile", setMethod("write.text", signature(x = "SparkDataFrame", path = "character"), function(x, path, mode = "error", ...) { - options <- varargsToStrEnv(...) - jmode <- convertToJSaveMode(mode) write <- callJMethod(x@sdf, "write") - write <- callJMethod(write, "options", options) - write <- callJMethod(write, "mode", jmode) + write <- setWriteOptions(write, mode = mode, ...) invisible(callJMethod(write, "text", path)) }) @@ -2653,15 +2662,12 @@ setMethod("write.df", if (is.null(source)) { source <- getDefaultSqlSource() } - jmode <- convertToJSaveMode(mode) - options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path } write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) - write <- callJMethod(write, "mode", jmode) - write <- callJMethod(write, "options", options) + write <- setWriteOptions(write, path = path, mode = mode, ...) write <- handledCallJMethod(write, "save") }) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 0fff9a79fe2f..d5e009b858d4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -328,6 +328,7 @@ setMethod("toDF", signature(x = "RDD"), #' It goes through the entire dataset once to determine the schema. #' #' @param path Path of file to read. A vector of multiple paths is allowed. +#' @param ... additional external data source specific named properties. #' @return SparkDataFrame #' @rdname read.json #' @export @@ -407,6 +408,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { #' Loads an ORC file, returning the result as a SparkDataFrame. #' #' @param path Path of file to read. +#' @param ... additional external data source specific named properties. #' @return SparkDataFrame #' @rdname read.orc #' @export @@ -428,6 +430,7 @@ read.orc <- function(path, ...) { #' Loads a Parquet file, returning the result as a SparkDataFrame. #' #' @param path path of file to read. A vector of multiple paths is allowed. +#' @param ... additional external data source specific named properties. #' @return SparkDataFrame #' @rdname read.parquet #' @export @@ -473,6 +476,7 @@ parquetFile <- function(x, ...) { #' Each line in the text file is a new row in the resulting SparkDataFrame. #' #' @param path Path of file to read. A vector of multiple paths is allowed. +#' @param ... additional external data source specific named properties. #' @return SparkDataFrame #' @rdname read.text #' @export diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index ce391101cf18..a29af7156243 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -342,7 +342,8 @@ varargsToStrEnv <- function(...) { for (name in names(pairs)) { value <- pairs[[name]] if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) { - stop("value[", value, "] in key[", name, "] is not convertable to string.") + stop(paste0("Unsupported type for ", name, " : ", class(value), + ". Supported types are logical, numeric, character and null.")) } if (is.logical(value)) { env[[name]] <- tolower(as.character(value)) diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 87dced471ffe..bd3fc2b0cca9 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -221,7 +221,9 @@ test_that("varargsToStrEnv", { strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd") env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd") expect_equal(strenv, env) - expect_error(varargsToStrEnv(a = list(1, "a"))) + expect_error(varargsToStrEnv(a = list(1, "a")), + paste0("Unsupported type for a : list. Supported types are logical, ", + "numeric, character and null.")) }) sparkR.session.stop() From ba772b3e7ff28f7f5d638e138dfc16cac03ced97 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Sep 2016 22:29:58 +0900 Subject: [PATCH 08/12] Lint fix --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 8196bd554b3f..99e710891ba1 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -57,7 +57,7 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { #' Set options/mode and then return the write object #' @noRd -setWriteOptions <- function(write, path = NULL, mode = 'error', ...) { +setWriteOptions <- function(write, path = NULL, mode = "error", ...) { options <- varargsToStrEnv(...) if (!is.null(path)) { options[["path"]] <- path From 8e104152e00d0859a39a1cf3427639e85304be1f Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Sep 2016 22:32:04 +0900 Subject: [PATCH 09/12] Consistent location --- R/pkg/R/SQLContext.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index d5e009b858d4..d279a35d8f11 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -438,8 +438,8 @@ read.orc <- function(path, ...) { #' @method read.parquet default #' @note read.parquet since 1.6.0 read.parquet.default <- function(path, ...) { - options <- varargsToStrEnv(...) sparkSession <- getSparkSession() + options <- varargsToStrEnv(...) # Allow the user to have a more flexible definiton of the Parquet file path paths <- as.list(suppressWarnings(normalizePath(path))) read <- callJMethod(sparkSession, "read") From 63b55f204af82474d8bafcceed0e8b875e66157a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 27 Sep 2016 23:39:31 +0900 Subject: [PATCH 10/12] Fix doc --- R/pkg/R/SQLContext.R | 1 - 1 file changed, 1 deletion(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index d279a35d8f11..0d6a229e6345 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -430,7 +430,6 @@ read.orc <- function(path, ...) { #' Loads a Parquet file, returning the result as a SparkDataFrame. #' #' @param path path of file to read. A vector of multiple paths is allowed. -#' @param ... additional external data source specific named properties. #' @return SparkDataFrame #' @rdname read.parquet #' @export From f268305a26dcfff50a97ea7f8622b8b4aa905247 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 4 Oct 2016 10:03:37 +0900 Subject: [PATCH 11/12] Address comments ("null" to "NULL" and change arguments in generic.R) --- R/pkg/R/generics.R | 8 ++++---- R/pkg/R/utils.R | 2 +- R/pkg/inst/tests/testthat/test_utils.R | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 6d84ecb5b9a1..810aea901774 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -651,15 +651,15 @@ setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) { #' @rdname write.json #' @export -setGeneric("write.json", function(x, path, mode = NULL, ...) { standardGeneric("write.json") }) +setGeneric("write.json", function(x, path, ...) { standardGeneric("write.json") }) #' @rdname write.orc #' @export -setGeneric("write.orc", function(x, path, mode = NULL, ...) { standardGeneric("write.orc") }) +setGeneric("write.orc", function(x, path, ...) { standardGeneric("write.orc") }) #' @rdname write.parquet #' @export -setGeneric("write.parquet", function(x, path, mode = NULL, ...) { +setGeneric("write.parquet", function(x, path, ...) { standardGeneric("write.parquet") }) @@ -669,7 +669,7 @@ setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParqu #' @rdname write.text #' @export -setGeneric("write.text", function(x, path, mode = NULL, ...) { standardGeneric("write.text") }) +setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") }) #' @rdname schema #' @export diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index a29af7156243..fa8bb0f79ce8 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -343,7 +343,7 @@ varargsToStrEnv <- function(...) { value <- pairs[[name]] if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) { stop(paste0("Unsupported type for ", name, " : ", class(value), - ". Supported types are logical, numeric, character and null.")) + ". Supported types are logical, numeric, character and NULL.")) } if (is.logical(value)) { env[[name]] <- tolower(as.character(value)) diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index bd3fc2b0cca9..a20254e9b3fa 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -223,7 +223,7 @@ test_that("varargsToStrEnv", { expect_equal(strenv, env) expect_error(varargsToStrEnv(a = list(1, "a")), paste0("Unsupported type for a : list. Supported types are logical, ", - "numeric, character and null.")) + "numeric, character and NULL.")) }) sparkR.session.stop() From eeb7db5cce149710fe71d69710594160fd578b91 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 6 Oct 2016 20:43:02 +0900 Subject: [PATCH 12/12] Remove duplicated path checking --- R/pkg/R/DataFrame.R | 3 --- 1 file changed, 3 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 99e710891ba1..801d2ed4e750 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2662,9 +2662,6 @@ setMethod("write.df", if (is.null(source)) { source <- getDefaultSqlSource() } - if (!is.null(path)) { - options[["path"]] <- path - } write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) write <- setWriteOptions(write, path = path, mode = mode, ...)