From 40d4d44223c354b2bd8a046fc017112e8b261e6a Mon Sep 17 00:00:00 2001 From: zero323 Date: Tue, 13 Oct 2015 19:35:24 +0200 Subject: [PATCH 1/3] [SPARK-11086] Use dropFactors column-wise instead of nested loop when createDataFrame from a data.frame --- R/pkg/R/SQLContext.R | 7 ++----- R/pkg/inst/tests/test_sparkSQL.R | 8 ++++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 399f53657a68..a11841e028e3 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -90,8 +90,6 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 if (is.null(schema)) { schema <- names(data) } - n <- nrow(data) - m <- ncol(data) # get rid of factor type dropFactor <- function(x) { if (is.factor(x)) { @@ -100,9 +98,8 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 x } } - data <- lapply(1:n, function(i) { - lapply(1:m, function(j) { dropFactor(data[i,j]) }) - }) + args <- list(FUN=list, SIMPLIFY=FALSE, USE.NAMES=FALSE) + data <- do.call(mapply, append(args, setNames(lapply(data, dropFactor), NULL))) } if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index d5509e475de0..beaa904cbcd1 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -242,6 +242,14 @@ test_that("create DataFrame from list or data.frame", { expect_equal(count(df), 3) ldf2 <- collect(df) expect_equal(ldf$a, ldf2$a) + + irisdf <- createDataFrame(sqlContext, iris) + iris_collected <- collect(irisdf) + expect_equivalent(iris_collected[,-5], iris[,-5]) + expect_equal(iris_collected$Species, as.character(iris$Species)) + + mtcarsdf <- createDataFrame(sqlContext, mtcars) + expect_equivalent(collect(mtcarsdf), mtcars) }) test_that("create DataFrame with different data types", { From aa80d54470415942278321dc1393aa41be207d58 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sat, 17 Oct 2015 01:36:58 +0200 Subject: [PATCH 2/3] Make sure list columns won't be flattened by mapply --- R/pkg/R/SQLContext.R | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index a11841e028e3..d3f7ab277089 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -90,16 +90,18 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 if (is.null(schema)) { schema <- names(data) } - # get rid of factor type - dropFactor <- function(x) { + # get rid of factor type and adjust for lists + cleanCols <- function(x) { if (is.factor(x)) { as.character(x) + } else if(is.list(x) && !is.environment(x)) { + lapply(x, list) } else { x } } - args <- list(FUN=list, SIMPLIFY=FALSE, USE.NAMES=FALSE) - data <- do.call(mapply, append(args, setNames(lapply(data, dropFactor), NULL))) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + data <- do.call(mapply, append(args, setNames(lapply(data, cleanCols), NULL))) } if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext) From 5e9580cb03b1e28b790deb099a443c64fbcae9a6 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 21 Oct 2015 05:10:06 +0200 Subject: [PATCH 3/3] Add type checking inside createDataFrame --- R/pkg/R/SQLContext.R | 51 ++++++++++++++++++++------------ R/pkg/inst/tests/test_sparkSQL.R | 8 +++++ 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index d3f7ab277089..bed2650e5074 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -17,27 +17,33 @@ # SQLcontext.R: SQLContext-driven functions + +# Map top level R type to SQL type +getInternalType <- function(x) { + # class of POSIXlt is c("POSIXlt" "POSIXt") + switch(class(x)[[1]], + integer = "integer", + character = "string", + logical = "boolean", + double = "double", + numeric = "double", + raw = "binary", + list = "array", + struct = "struct", + environment = "map", + Date = "date", + POSIXlt = "timestamp", + POSIXct = "timestamp", + stop(paste("Unsupported type for DataFrame:", class(x)))) +} + #' infer the SQL type infer_type <- function(x) { if (is.null(x)) { stop("can not infer type from NULL") } - # class of POSIXlt is c("POSIXlt" "POSIXt") - type <- switch(class(x)[[1]], - integer = "integer", - character = "string", - logical = "boolean", - double = "double", - numeric = "double", - raw = "binary", - list = "array", - struct = "struct", - environment = "map", - Date = "date", - POSIXlt = "timestamp", - POSIXct = "timestamp", - stop(paste("Unsupported type for DataFrame:", class(x)))) + type <- getInternalType(x) if (type == "map") { stopifnot(length(x) > 0) @@ -90,18 +96,25 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0 if (is.null(schema)) { schema <- names(data) } - # get rid of factor type and adjust for lists + + # get rid of factor type cleanCols <- function(x) { if (is.factor(x)) { as.character(x) - } else if(is.list(x) && !is.environment(x)) { - lapply(x, list) } else { x } } + + # drop factors and wrap lists + data <- setNames(lapply(data, cleanCols), NULL) + + # check if all columns have supported type + lapply(data, getInternalType) + + # convert to rows args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) - data <- do.call(mapply, append(args, setNames(lapply(data, cleanCols), NULL))) + data <- do.call(mapply, append(args, data)) } if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index beaa904cbcd1..71836b05349b 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -291,6 +291,14 @@ test_that("create DataFrame with complex types", { expect_equal(s$b, 3L) }) +test_that("create DataFrame from a data.frame with complex types", { + ldf <- data.frame(row.names=1:2) + ldf$a_list <- list(list(1, 2), list(3, 4)) + sdf <- createDataFrame(sqlContext, ldf) + + expect_equivalent(ldf, collect(sdf)) +}) + # For test map type and struct type in DataFrame mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",