Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"write.df")
"write.df",
"write.json",
"write.parquet")

exportClasses("Column")

Expand Down
51 changes: 45 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -596,30 +596,69 @@ setMethod("toJSON",
RDD(jrdd, serializedMode = "string")
})

#' saveAsParquetFile
#' write.json
#'
#' Save the contents of a DataFrame as a JSON file (one object per line). Files written out
#' with this method can be read back in as a DataFrame using read.json().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname write.json
#' @name write.json
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' write.json(df, "/tmp/sparkr-tmp/")
#'}
setMethod("write.json",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "json", path))
})

#' write.parquet
#'
#' Save the contents of a DataFrame as a Parquet file, preserving the schema. Files written out
#' with this method can be read back in as a DataFrame using parquetFile().
#' with this method can be read back in as a DataFrame using read.parquet().
#'
#' @param x A SparkSQL DataFrame
#' @param path The directory where the file is saved
#'
#' @family DataFrame functions
#' @rdname saveAsParquetFile
#' @name saveAsParquetFile
#' @rdname write.parquet
#' @name write.parquet
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' saveAsParquetFile(df, "/tmp/sparkr-tmp/")
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
setMethod("write.parquet",
signature(x = "DataFrame", path = "character"),
function(x, path) {
write <- callJMethod(x@sdf, "write")
invisible(callJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @export
setMethod("saveAsParquetFile",
signature(x = "DataFrame", path = "character"),
function(x, path) {
invisible(callJMethod(x@sdf, "saveAsParquetFile", path))
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Distinct
Expand Down
16 changes: 12 additions & 4 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,6 @@ setGeneric("sample_frac",
#' @export
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })

#' @rdname saveAsParquetFile
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname saveAsTable
#' @export
setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
Expand All @@ -541,6 +537,18 @@ setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
#' @export
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })

#' @rdname write.parquet
#' @export
setGeneric("write.parquet", function(x, path) { standardGeneric("write.parquet") })

#' @rdname write.parquet
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })

#' @rdname schema
#' @export
setGeneric("schema", function(x) { standardGeneric("schema") })
Expand Down
104 changes: 59 additions & 45 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,49 @@ test_that("Collect DataFrame with complex types", {
expect_equal(bob$height, 176.5)
})

test_that("read.json()/jsonFile() on a local file returns a DataFrame", {
test_that("read/write json files", {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming this test case to "read/write json files", where read.json(), jsonFile(), read.df(), write.json(), write.df() are tested to read/write json files. This test case can be merged with the one "read.df() from json file"

# Test read.df
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Test read.df with a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Test loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))

# Test read.json
df <- read.json(sqlContext, jsonPath)
expect_is(df, "DataFrame")
expect_equal(count(df), 3)
# read.json()/jsonFile() works with multiple input paths

# Test write.df
jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".json")
write.df(df, jsonPath2, "json", mode="overwrite")
jsonDF1 <- read.json(sqlContext, c(jsonPath, jsonPath2))

# Test write.json
jsonPath3 <- tempfile(pattern="jsonPath3", fileext=".json")
write.json(df, jsonPath3)

# Test read.json()/jsonFile() works with multiple input paths
jsonDF1 <- read.json(sqlContext, c(jsonPath2, jsonPath3))
expect_is(jsonDF1, "DataFrame")
expect_equal(count(jsonDF1), 6)
# Suppress warnings because jsonFile is deprecated
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath, jsonPath2)))
jsonDF2 <- suppressWarnings(jsonFile(sqlContext, c(jsonPath2, jsonPath3)))
expect_is(jsonDF2, "DataFrame")
expect_equal(count(jsonDF2), 6)

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("jsonRDD() on a RDD with json string", {
Expand Down Expand Up @@ -454,6 +481,9 @@ test_that("insertInto() on a registered table", {
expect_equal(count(sql(sqlContext, "select * from table1")), 2)
expect_equal(first(sql(sqlContext, "select * from table1 order by age"))$name, "Bob")
dropTempTable(sqlContext, "table1")

unlink(jsonPath2)
unlink(parquetPath2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be unlink(jsonPath2) as well? if so could you please add that

})

test_that("table() returns a new DataFrame", {
Expand Down Expand Up @@ -848,33 +878,6 @@ test_that("column calculation", {
expect_equal(count(df2), 3)
})

test_that("read.df() from json file", {
df <- read.df(sqlContext, jsonPath, "json")
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

# Check if we can apply a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_is(df1, "DataFrame")
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Run the same with loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_is(df2, "DataFrame")
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
})

test_that("write.df() as parquet file", {
df <- read.df(sqlContext, jsonPath, "json")
write.df(df, parquetPath, "parquet", mode="overwrite")
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)
})

test_that("test HiveContext", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
Expand All @@ -895,6 +898,8 @@ test_that("test HiveContext", {
df3 <- sql(hiveCtx, "select * from json2")
expect_is(df3, "DataFrame")
expect_equal(count(df3), 3)

unlink(jsonPath2)
})

test_that("column operators", {
Expand Down Expand Up @@ -1333,6 +1338,9 @@ test_that("join() and merge() on a DataFrame", {
expect_error(merge(df, df3),
paste("The following column name: name_y occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.", sep = ""))

unlink(jsonPath2)
unlink(jsonPath3)
})

test_that("toJSON() returns an RDD of the correct values", {
Expand Down Expand Up @@ -1396,6 +1404,8 @@ test_that("unionAll(), rbind(), except(), and intersect() on a DataFrame", {

# Test base::intersect is working
expect_equal(length(intersect(1:20, 3:23)), 18)

unlink(jsonPath2)
})

test_that("withColumn() and withColumnRenamed()", {
Expand Down Expand Up @@ -1440,31 +1450,35 @@ test_that("mutate(), transform(), rename() and names()", {
detach(airquality)
})

test_that("write.df() on DataFrame and works with read.parquet", {
df <- read.json(sqlContext, jsonPath)
test_that("read/write Parquet files", {
df <- read.df(sqlContext, jsonPath, "json")
# Test write.df and read.df
write.df(df, parquetPath, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, parquetPath)
expect_is(parquetDF, "DataFrame")
expect_equal(count(df), count(parquetDF))
})
df2 <- read.df(sqlContext, parquetPath, "parquet")
expect_is(df2, "DataFrame")
expect_equal(count(df2), 3)

test_that("read.parquet()/parquetFile() works with multiple input paths", {
df <- read.json(sqlContext, jsonPath)
write.df(df, parquetPath, "parquet", mode="overwrite")
# Test write.parquet/saveAsParquetFile and read.parquet/parquetFile
parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode="overwrite")
parquetDF <- read.parquet(sqlContext, c(parquetPath, parquetPath2))
write.parquet(df, parquetPath2)
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
suppressWarnings(saveAsParquetFile(df, parquetPath3))
parquetDF <- read.parquet(sqlContext, c(parquetPath2, parquetPath3))
expect_is(parquetDF, "DataFrame")
expect_equal(count(parquetDF), count(df) * 2)
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath, parquetPath2))
parquetDF2 <- suppressWarnings(parquetFile(sqlContext, parquetPath2, parquetPath3))
expect_is(parquetDF2, "DataFrame")
expect_equal(count(parquetDF2), count(df) * 2)

# Test if varargs works with variables
saveMode <- "overwrite"
mergeSchema <- "true"
parquetPath3 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
parquetPath4 <- tempfile(pattern = "parquetPath3", fileext = ".parquet")
write.df(df, parquetPath3, "parquet", mode = saveMode, mergeSchema = mergeSchema)

unlink(parquetPath2)
unlink(parquetPath3)
unlink(parquetPath4)
})

test_that("describe() and summarize() on a DataFrame", {
Expand Down