Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ setMethod("registerTempTable",
setMethod("insertInto",
signature(x = "DataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
callJMethod(x@sdf, "insertInto", tableName, overwrite)
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
})

#' Cache
Expand Down Expand Up @@ -1948,18 +1951,15 @@ setMethod("write.df",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "save", path)
})

#' @rdname write.df
Expand Down Expand Up @@ -2013,15 +2013,14 @@ setMethod("saveAsTable",
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
allModes <- c("append", "overwrite", "error", "ignore")
# nolint start
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"')
}
# nolint end
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode <- convertToJSaveMode(mode)
options <- varargsToEnv(...)
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
})

#' summary
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,12 @@ jsonFile <- function(sqlContext, path) {

# TODO: support schema
jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
.Deprecated("read.json")
rdd <- serializeToString(rdd)
if (is.null(schema)) {
sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio)
read <- callJMethod(sqlContext, "read")
# samplingRatio is deprecated
sdf <- callJMethod(read, "json", callJMethod(getJRDD(rdd), "rdd"))
dataFrame(sdf)
} else {
stop("not implemented")
Expand Down Expand Up @@ -289,10 +292,7 @@ read.parquet <- function(sqlContext, path) {
# TODO: Implement saveasParquetFile and write examples for both
parquetFile <- function(sqlContext, ...) {
.Deprecated("read.parquet")
# Allow the user to have a more flexible definiton of the text file path
paths <- lapply(list(...), function(x) suppressWarnings(normalizePath(x)))
sdf <- callJMethod(sqlContext, "parquetFile", paths)
dataFrame(sdf)
read.parquet(sqlContext, unlist(list(...)))
}

#' SQL Query
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ setMethod("cast",
setMethod("%in%",
signature(x = "Column"),
function(x, table) {
jc <- callJMethod(x@jc, "in", as.list(table))
jc <- callJMethod(x@jc, "isin", as.list(table))
return(column(jc))
})

Expand Down
9 changes: 9 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,12 @@ assignNewEnv <- function(data) {
splitString <- function(input) {
Filter(nzchar, unlist(strsplit(input, ",|\\s")))
}

convertToJSaveMode <- function(mode) {
allModes <- c("append", "overwrite", "error", "ignore")
if (!(mode %in% allModes)) {
stop('mode should be one of "append", "overwrite", "error", "ignore"') # nolint
}
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
jmode
}
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,12 @@ test_that("read/write json files", {
test_that("jsonRDD() on a RDD with json string", {
rdd <- parallelize(sc, mockLines)
expect_equal(count(rdd), 3)
df <- jsonRDD(sqlContext, rdd)
df <- suppressWarnings(jsonRDD(sqlContext, rdd))
expect_is(df, "DataFrame")
expect_equal(count(df), 3)

rdd2 <- flatMap(rdd, function(x) c(x, x))
df <- jsonRDD(sqlContext, rdd2)
df <- suppressWarnings(jsonRDD(sqlContext, rdd2))
expect_is(df, "DataFrame")
expect_equal(count(df), 6)
})
Expand Down
11 changes: 5 additions & 6 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,12 @@ def run_build_tests():


def run_sparkr_tests():
# set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")
set_title_and_block("Running SparkR tests", "BLOCK_SPARKR_UNIT_TESTS")

# if which("R"):
# run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
# else:
# print("Ignoring SparkR tests as R was not found in PATH")
pass
if which("R"):
run_cmd([os.path.join(SPARK_HOME, "R", "run-tests.sh")])
else:
print("Ignoring SparkR tests as R was not found in PATH")


def parse_opts():
Expand Down