Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ readTypedObject <- function(con, type) {

readString <- function(con) {
stringLen <- readInt(con)
string <- readBin(con, raw(), stringLen, endian = "big")
rawToChar(string)
raw <- readBin(con, raw(), stringLen, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}

readInt <- function(con) {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ writeJobj <- function(con, value) {
writeString <- function(con, value) {
utfVal <- enc2utf8(value)
writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
writeBin(utfVal, con, endian = "big")
writeBin(utfVal, con, endian = "big", useBytes=TRUE)
}

writeInt <- function(con, value) {
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,32 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})

test_that("collect() support Unicode characters", {
markUtf8 <- function(s) {
Encoding(s) <- "UTF-8"
s
}

lines <- c("{\"name\":\"안녕하세요\"}",
"{\"name\":\"您好\", \"age\":30}",
"{\"name\":\"こんにちは\", \"age\":19}",
"{\"name\":\"Xin chào\"}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a little bit confused about the behavior of treating unicode string in non-UTF8 local. Why no need to makeUtf8 for these unicode strings?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is related to the R interpreter. I made an example to see how the R interpreter handle unicode strings as to the UTF-8 indicator with different locales.
You can see R adds 80(maybe, the UTF-8 indicator) before 09 with the UTF-8 locale automatically. I think this example could explain the reason why the testcase passes under the UTF-8 locale without using markUtf8.

test.R

a<-"가"
Encoding(a)
print(serialize(a, connection=NULL))

with UTF-8 locale the output is

$ r -f test.R
> a<-"가"
> Encoding(a)
[1] "UTF-8"
> print(serialize(a, connection=NULL))
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 80
[26] 09 00 00 00 03 ea b0 80

with C(ascii) locale the output is

$ r -f test.R
> a<-"가"
> Encoding(a)
[1] "unknown"
> print(serialize(a, connection=NULL))
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 00
[26] 09 00 00 00 03 ea b0 80

jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath)

df <- read.df(sqlContext, jsonPath, "json")
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(rdf$name[1], markUtf8("안녕하세요"))
expect_equal(rdf$name[2], markUtf8("您好"))
expect_equal(rdf$name[3], markUtf8("こんにちは"))
expect_equal(rdf$name[4], markUtf8("Xin chào"))

df1 <- createDataFrame(sqlContext, rdf)
expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
})

test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,11 @@ private[spark] object SerDe {
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
}

// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
val len = value.length
out.writeInt(len + 1) // For the \0
out.writeBytes(value)
out.writeByte(0)
val utf8 = value.getBytes("UTF-8")
val len = utf8.length
out.writeInt(len)
out.write(utf8, 0, len)
}

def writeBytes(out: DataOutputStream, value: Array[Byte]): Unit = {
Expand Down