Skip to content

Commit ecd877e

Browse files
felixcheungshivaram
authored andcommitted
[SPARK-12224][SPARKR] R support for JDBC source
Add R API for `read.jdbc`, `write.jdbc`. Tested this quite a bit manually with different combinations of parameters. It's not clear if we could have automated tests in R for this - Scala `JDBCSuite` depends on Java H2 in-memory database. Refactored some code into util so they could be tested. Core's R SerDe code needs to be updated to allow access to java.util.Properties as `jobj` handle which is required by DataFrameReader/Writer's `jdbc` method. It would be possible, though more code to add a `sql/r/SQLUtils` helper function. Tested: ``` # with postgresql ../bin/sparkR --driver-class-path /usr/share/java/postgresql-9.4.1207.jre7.jar # read.jdbc df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", user = "user", password = "12345") df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", user = "user", password = 12345) # partitionColumn and numPartitions test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", partitionColumn = "did", lowerBound = 0, upperBound = 200, numPartitions = 4, user = "user", password = 12345) a <- SparkR:::toRDD(df) SparkR:::getNumPartitions(a) [1] 4 SparkR:::collectPartition(a, 2L) # defaultParallelism test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", partitionColumn = "did", lowerBound = 0, upperBound = 200, user = "user", password = 12345) SparkR:::getNumPartitions(a) [1] 2 # predicates test df <- read.jdbc(sqlContext, "jdbc:postgresql://localhost/db", "films2", predicates = list("did<=105"), user = "user", password = 12345) count(df) == 1 # write.jdbc, default save mode "error" irisDf <- as.DataFrame(sqlContext, iris) write.jdbc(irisDf, "jdbc:postgresql://localhost/db", "films2", user = "user", password = "12345") "error, already exists" write.jdbc(irisDf, "jdbc:postgresql://localhost/db", "iris", user = "user", password = "12345") ``` Author: felixcheung <felixcheung_m@hotmail.com> Closes #10480 from felixcheung/rreadjdbc.
1 parent 008a8bb commit ecd877e

File tree

7 files changed

+146
-1
lines changed

7 files changed

+146
-1
lines changed

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ exportMethods("arrange",
101101
"withColumn",
102102
"withColumnRenamed",
103103
"write.df",
104+
"write.jdbc",
104105
"write.json",
105106
"write.parquet",
106107
"write.text")
@@ -284,6 +285,7 @@ export("as.DataFrame",
284285
"loadDF",
285286
"parquetFile",
286287
"read.df",
288+
"read.jdbc",
287289
"read.json",
288290
"read.parquet",
289291
"read.text",

R/pkg/R/DataFrame.R

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2363,7 +2363,7 @@ setMethod("with",
23632363
#' @examples \dontrun{
23642364
#' # Create a DataFrame from the Iris dataset
23652365
#' irisDF <- createDataFrame(sqlContext, iris)
2366-
#'
2366+
#'
23672367
#' # Show the structure of the DataFrame
23682368
#' str(irisDF)
23692369
#' }
@@ -2468,3 +2468,40 @@ setMethod("drop",
24682468
function(x) {
24692469
base::drop(x)
24702470
})
2471+
2472+
#' Saves the content of the DataFrame to an external database table via JDBC
2473+
#'
2474+
#' Additional JDBC database connection properties can be set (...)
2475+
#'
2476+
#' Also, mode is used to specify the behavior of the save operation when
2477+
#' data already exists in the data source. There are four modes: \cr
2478+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
2479+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
2480+
#' error: An exception is expected to be thrown. \cr
2481+
#' ignore: The save operation is expected to not save the contents of the DataFrame
2482+
#' and to not change the existing data. \cr
2483+
#'
2484+
#' @param x A SparkSQL DataFrame
2485+
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
2486+
#' @param tableName The name of the table in the external database
2487+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
2488+
#' @family DataFrame functions
2489+
#' @rdname write.jdbc
2490+
#' @name write.jdbc
2491+
#' @export
2492+
#' @examples
2493+
#'\dontrun{
2494+
#' sc <- sparkR.init()
2495+
#' sqlContext <- sparkRSQL.init(sc)
2496+
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
2497+
#' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
2498+
#' }
2499+
setMethod("write.jdbc",
2500+
signature(x = "DataFrame", url = "character", tableName = "character"),
2501+
function(x, url, tableName, mode = "error", ...){
2502+
jmode <- convertToJSaveMode(mode)
2503+
jprops <- varargsToJProperties(...)
2504+
write <- callJMethod(x@sdf, "write")
2505+
write <- callJMethod(write, "mode", jmode)
2506+
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
2507+
})

R/pkg/R/SQLContext.R

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,3 +583,61 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL
583583
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
584584
dataFrame(sdf)
585585
}
586+
587+
#' Create a DataFrame representing the database table accessible via JDBC URL
588+
#'
589+
#' Additional JDBC database connection properties can be set (...)
590+
#'
591+
#' Only one of partitionColumn or predicates should be set. Partitions of the table will be
592+
#' retrieved in parallel based on the `numPartitions` or by the predicates.
593+
#'
594+
#' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
595+
#' your external database systems.
596+
#'
597+
#' @param sqlContext SQLContext to use
598+
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
599+
#' @param tableName the name of the table in the external database
600+
#' @param partitionColumn the name of a column of integral type that will be used for partitioning
601+
#' @param lowerBound the minimum value of `partitionColumn` used to decide partition stride
602+
#' @param upperBound the maximum value of `partitionColumn` used to decide partition stride
603+
#' @param numPartitions the number of partitions, This, along with `lowerBound` (inclusive),
604+
#' `upperBound` (exclusive), form partition strides for generated WHERE
605+
#' clause expressions used to split the column `partitionColumn` evenly.
606+
#' This defaults to SparkContext.defaultParallelism when unset.
607+
#' @param predicates a list of conditions in the where clause; each one defines one partition
608+
#' @return DataFrame
609+
#' @rdname read.jdbc
610+
#' @name read.jdbc
611+
#' @export
612+
#' @examples
613+
#'\dontrun{
614+
#' sc <- sparkR.init()
615+
#' sqlContext <- sparkRSQL.init(sc)
616+
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
617+
#' df <- read.jdbc(sqlContext, jdbcUrl, "table", predicates = list("field<=123"), user = "username")
618+
#' df2 <- read.jdbc(sqlContext, jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
619+
#' upperBound = 10000, user = "username", password = "password")
620+
#' }
621+
622+
read.jdbc <- function(sqlContext, url, tableName,
623+
partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
624+
numPartitions = 0L, predicates = list(), ...) {
625+
jprops <- varargsToJProperties(...)
626+
627+
read <- callJMethod(sqlContext, "read")
628+
if (!is.null(partitionColumn)) {
629+
if (is.null(numPartitions) || numPartitions == 0) {
630+
sc <- callJMethod(sqlContext, "sparkContext")
631+
numPartitions <- callJMethod(sc, "defaultParallelism")
632+
} else {
633+
numPartitions <- numToInt(numPartitions)
634+
}
635+
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
636+
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
637+
} else if (length(predicates) > 0) {
638+
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
639+
} else {
640+
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
641+
}
642+
dataFrame(sdf)
643+
}

R/pkg/R/generics.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,12 @@ setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {
577577
standardGeneric("saveDF")
578578
})
579579

580+
#' @rdname write.jdbc
581+
#' @export
582+
setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
583+
standardGeneric("write.jdbc")
584+
})
585+
580586
#' @rdname write.json
581587
#' @export
582588
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })

R/pkg/R/utils.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,3 +650,14 @@ convertToJSaveMode <- function(mode) {
650650
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
651651
jmode
652652
}
653+
654+
varargsToJProperties <- function(...) {
655+
pairs <- list(...)
656+
props <- newJObject("java.util.Properties")
657+
if (length(pairs) > 0) {
658+
lapply(ls(pairs), function(k) {
659+
callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))
660+
})
661+
}
662+
props
663+
}

R/pkg/inst/tests/testthat/test_utils.R

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,27 @@ test_that("cleanClosure on R functions", {
140140
expect_equal(ls(env), "aBroadcast")
141141
expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
142142
})
143+
144+
test_that("varargsToJProperties", {
145+
jprops <- newJObject("java.util.Properties")
146+
expect_true(class(jprops) == "jobj")
147+
148+
jprops <- varargsToJProperties(abc = "123")
149+
expect_true(class(jprops) == "jobj")
150+
expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")
151+
152+
jprops <- varargsToJProperties(abc = "abc", b = 1)
153+
expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
154+
expect_equal(callJMethod(jprops, "getProperty", "b"), "1")
155+
156+
jprops <- varargsToJProperties()
157+
expect_equal(callJMethod(jprops, "size"), 0L)
158+
})
159+
160+
test_that("convertToJSaveMode", {
161+
s <- convertToJSaveMode("error")
162+
expect_true(class(s) == "jobj")
163+
expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ")
164+
expect_error(convertToJSaveMode("foo"),
165+
'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
166+
})

core/src/main/scala/org/apache/spark/api/r/SerDe.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,13 @@ private[spark] object SerDe {
356356
writeInt(dos, v.length)
357357
v.foreach(elem => writeObject(dos, elem))
358358

359+
// Handle Properties
360+
// This must be above the case java.util.Map below.
361+
// (Properties implements Map<Object,Object> and will be serialized as map otherwise)
362+
case v: java.util.Properties =>
363+
writeType(dos, "jobj")
364+
writeJObj(dos, value)
365+
359366
// Handle map
360367
case v: java.util.Map[_, _] =>
361368
writeType(dos, "map")

0 commit comments

Comments
 (0)