Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 33 additions & 21 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,33 @@

# SQLcontext.R: SQLContext-driven functions


# Map top level R type to SQL type
getInternalType <- function(x) {
# class of POSIXlt is c("POSIXlt" "POSIXt")
switch(class(x)[[1]],
integer = "integer",
character = "string",
logical = "boolean",
double = "double",
numeric = "double",
raw = "binary",
list = "array",
struct = "struct",
environment = "map",
Date = "date",
POSIXlt = "timestamp",
POSIXct = "timestamp",
stop(paste("Unsupported type for DataFrame:", class(x))))
}

#' infer the SQL type
infer_type <- function(x) {
if (is.null(x)) {
stop("can not infer type from NULL")
}

# class of POSIXlt is c("POSIXlt" "POSIXt")
type <- switch(class(x)[[1]],
integer = "integer",
character = "string",
logical = "boolean",
double = "double",
numeric = "double",
raw = "binary",
list = "array",
struct = "struct",
environment = "map",
Date = "date",
POSIXlt = "timestamp",
POSIXct = "timestamp",
stop(paste("Unsupported type for DataFrame:", class(x))))
type <- getInternalType(x)

if (type == "map") {
stopifnot(length(x) > 0)
Expand Down Expand Up @@ -90,19 +96,25 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
if (is.null(schema)) {
schema <- names(data)
}
n <- nrow(data)
m <- ncol(data)

# get rid of factor type
dropFactor <- function(x) {
cleanCols <- function(x) {
if (is.factor(x)) {
as.character(x)
} else {
x
}
}
data <- lapply(1:n, function(i) {
lapply(1:m, function(j) { dropFactor(data[i,j]) })
})

# drop factors and wrap lists
data <- setNames(lapply(data, cleanCols), NULL)

# check if all columns have supported type
lapply(data, getInternalType)

# convert to rows
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
data <- do.call(mapply, append(args, data))
}
if (is.list(data)) {
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext)
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ test_that("create DataFrame from list or data.frame", {
expect_equal(count(df), 3)
ldf2 <- collect(df)
expect_equal(ldf$a, ldf2$a)

irisdf <- createDataFrame(sqlContext, iris)
iris_collected <- collect(irisdf)
expect_equivalent(iris_collected[,-5], iris[,-5])
expect_equal(iris_collected$Species, as.character(iris$Species))

mtcarsdf <- createDataFrame(sqlContext, mtcars)
expect_equivalent(collect(mtcarsdf), mtcars)
})

test_that("create DataFrame with different data types", {
Expand Down Expand Up @@ -283,6 +291,14 @@ test_that("create DataFrame with complex types", {
expect_equal(s$b, 3L)
})

test_that("create DataFrame from a data.frame with complex types", {
ldf <- data.frame(row.names=1:2)
ldf$a_list <- list(list(1, 2), list(3, 4))
sdf <- createDataFrame(sqlContext, ldf)

expect_equivalent(ldf, collect(sdf))
})

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
Expand Down