Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ infer_type <- function(x) {
if (type == "map") {
stopifnot(length(x) > 0)
key <- ls(x)[[1]]
list(type = "map",
keyType = "string",
valueType = infer_type(get(key, x)),
valueContainsNull = TRUE)
paste0("map<string,", infer_type(get(key, x)), ">")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor thing -- in the previous list we had an entry for valueContainsNull that we dont have any more. I can see that this was always TRUE so this probably doesn't affect functionality right now, but I am just wondering if we had it for some other purpose

cc @daveis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to infer it is nullable or not. So it is always TRUE. Removing it does not affect functionality.

} else if (type == "array") {
stopifnot(length(x) > 0)
names <- names(x)
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ readTypedObject <- function(con, type) {
"t" = readTime(con),
"a" = readArray(con),
"l" = readList(con),
"e" = readEnv(con),
"n" = NULL,
"j" = getJobj(readString(con)),
stop(paste("Unsupported type for deserialization", type)))
Expand Down Expand Up @@ -121,6 +122,19 @@ readList <- function(con) {
}
}

readEnv <- function(con) {
env <- new.env()
len <- readInt(con)
if (len > 0) {
for (i in 1:len) {
key <- readString(con)
value <- readObject(con)
env[[key]] <- value
}
}
env
}

readRaw <- function(con) {
dataLen <- readInt(con)
readBin(con, raw(), as.integer(dataLen), endian = "big")
Expand Down
34 changes: 27 additions & 7 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,33 @@ checkType <- function(type) {
if (type %in% primtiveTypes) {
return()
} else {
m <- regexec("^array<(.*)>$", type)
matchedStrings <- regmatches(type, m)
if (length(matchedStrings[[1]]) >= 2) {
elemType <- matchedStrings[[1]][2]
checkType(elemType)
return()
}
# Check complex types
firstChar <- substr(type, 1, 1)
switch (firstChar,
a = {
# Array type
m <- regexec("^array<(.*)>$", type)
matchedStrings <- regmatches(type, m)
if (length(matchedStrings[[1]]) >= 2) {
elemType <- matchedStrings[[1]][2]
checkType(elemType)
return()
}
},
m = {
# Map type
m <- regexec("^map<(.*),(.*)>$", type)
matchedStrings <- regmatches(type, m)
if (length(matchedStrings[[1]]) >= 3) {
keyType <- matchedStrings[[1]][2]
if (keyType != "string" && keyType != "character") {
stop("Key type in a map must be string or character")
}
valueType <- matchedStrings[[1]][3]
checkType(valueType)
return()
}
})
}

stop(paste("Unsupported type for Dataframe:", type))
Expand Down
56 changes: 44 additions & 12 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

test_that("infer types", {
test_that("infer types and check types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
expect_equal(infer_type("abc"), "string")
Expand All @@ -72,9 +72,9 @@ test_that("infer types", {
checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE)
e <- new.env()
assign("a", 1L, envir = e)
expect_equal(infer_type(e),
list(type = "map", keyType = "string", valueType = "integer",
valueContainsNull = TRUE))
expect_equal(infer_type(e), "map<string,integer>")

expect_error(checkType("map<integer,integer>"), "Key type in a map must be string or character")
})

test_that("structType and structField", {
Expand Down Expand Up @@ -242,7 +242,7 @@ test_that("create DataFrame with different data types", {
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
})

test_that("create DataFrame with nested array and struct", {
test_that("create DataFrame with nested array and map", {
# e <- new.env()
# assign("n", 3L, envir = e)
# l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L))
Expand All @@ -253,21 +253,35 @@ test_that("create DataFrame with nested array and struct", {
# ldf <- collect(df)
# expect_equal(ldf[1,], l[[1]])

# ArrayType and MapType
e <- new.env()
assign("n", 3L, envir = e)

# ArrayType only for now
l <- list(as.list(1:10), list("a", "b"))
df <- createDataFrame(sqlContext, list(l), c("a", "b"))
expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>")))
l <- list(as.list(1:10), list("a", "b"), e)
df <- createDataFrame(sqlContext, list(l), c("a", "b", "c"))
expect_equal(dtypes(df), list(c("a", "array<int>"),
c("b", "array<string>"),
c("c", "map<string,int>")))
expect_equal(count(df), 1)
ldf <- collect(df)
expect_equal(names(ldf), c("a", "b"))
expect_equal(names(ldf), c("a", "b", "c"))
expect_equal(ldf[1, 1][[1]], l[[1]])
expect_equal(ldf[1, 2][[1]], l[[2]])
e <- ldf$c[[1]]
expect_equal(class(e), "environment")
expect_equal(ls(e), "n")
expect_equal(e$n, 3L)
})

# For test map type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("Collect DataFrame with complex types", {
# only ArrayType now
# TODO: tests for StructType and MapType after they are supported
# ArrayType
df <- jsonFile(sqlContext, complexTypeJsonPath)

ldf <- collect(df)
Expand All @@ -277,6 +291,24 @@ test_that("Collect DataFrame with complex types", {
expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))

# MapType
schema <- structType(structField("name", "string"),
structField("info", "map<string,double>"))
df <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
expect_equal(dtypes(df), list(c("name", "string"),
c("info", "map<string,double>")))
ldf <- collect(df)
expect_equal(nrow(ldf), 3)
expect_equal(ncol(ldf), 2)
expect_equal(names(ldf), c("name", "info"))
expect_equal(ldf$name, c("Bob", "Alice", "David"))
bob <- ldf$info[[1]]
expect_equal(class(bob), "environment")
expect_equal(bob$age, 16)
expect_equal(bob$height, 176.5)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a test for infer_type or check_type where the key type is not string (i.e. test if we catch the error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case.

# TODO: tests for StructType after it is supported
})

test_that("jsonFile() on a local file returns a DataFrame", {
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,23 @@ private[spark] object SerDe {
case "array" => dos.writeByte('a')
// Array of objects
case "list" => dos.writeByte('l')
case "map" => dos.writeByte('e')
case "jobj" => dos.writeByte('j')
case _ => throw new IllegalArgumentException(s"Invalid type $typeStr")
}
}

private def writeKeyValue(dos: DataOutputStream, key: Object, value: Object): Unit = {
if (key == null) {
throw new IllegalArgumentException("Key in map can't be null.")
} else if (!key.isInstanceOf[String]) {
throw new IllegalArgumentException(s"Invalid map key type: ${key.getClass.getName}")
}

writeString(dos, key.asInstanceOf[String])
writeObject(dos, value)
}

def writeObject(dos: DataOutputStream, obj: Object): Unit = {
if (obj == null) {
writeType(dos, "void")
Expand Down Expand Up @@ -306,6 +318,25 @@ private[spark] object SerDe {
writeInt(dos, v.length)
v.foreach(elem => writeObject(dos, elem))

// Handle map
case v: java.util.Map[_, _] =>
writeType(dos, "map")
writeInt(dos, v.size)
val iter = v.entrySet.iterator
while(iter.hasNext) {
val entry = iter.next
val key = entry.getKey
val value = entry.getValue

writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object])
}
case v: scala.collection.Map[_, _] =>
writeType(dos, "map")
writeInt(dos, v.size)
v.foreach { case (key, value) =>
writeKeyValue(dos, key.asInstanceOf[Object], value.asInstanceOf[Object])
}

case _ =>
writeType(dos, "jobj")
writeJObj(dos, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ private[r] object SQLUtils {
case r"\Aarray<(.*)${elemType}>\Z" => {
org.apache.spark.sql.types.ArrayType(getSQLDataType(elemType))
}
case r"\Amap<(.*)${keyType},(.*)${valueType}>\Z" => {
if (keyType != "string" && keyType != "character") {
throw new IllegalArgumentException("Key type of a map must be string or character")
}
org.apache.spark.sql.types.MapType(getSQLDataType(keyType), getSQLDataType(valueType))
}
case _ => throw new IllegalArgumentException(s"Invaid type $dataType")
}
}
Expand Down