Skip to content

Conversation

@jaehc
Copy link

@jaehc jaehc commented Jul 18, 2015

Spark gives an error message and does not show the output when a field of the result DataFrame contains characters in CJK.
I changed SerDe.scala in order that Spark support Unicode characters when writes a string to R.

@shivaram
Copy link
Contributor

Jenkins, ok to test

@shivaram
Copy link
Contributor

@CHOIJAEHONG1 Thanks for the PR. Are we sure we don't need to change readString in

readString <- function(con) {
?

cc @sun-rui

@SparkQA
Copy link

SparkQA commented Jul 18, 2015

Test build #37723 has finished for PR 7494 at commit 5325cef.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jaehc
Copy link
Author

jaehc commented Jul 19, 2015

I am not sure about readString, but the teatcase, which verifies the intactness of unicode characters in a native dataframe making a round trip to Spark's DataFrame, failed. There is something underneath.

1. Failure(@test_sparkSQL.R#438): collect() support Unicode characters ---------
collect(where(df2, df2$name == "\346\202\250\345\245\275"))[[2]] not equal to "\346\202\250\345\245\275"
1 string mismatches:
x[1]: "\346\202\250\345\245\275"
y[1]: "<e6><82><a8><e5><a5><bd>"

@sun-rui
Copy link
Contributor

sun-rui commented Jul 20, 2015

I think readString() in deserialize.R should be updated accordingly. Could you try:
string <- readBin(...)
Encoding(string) <- "UTF-8"
string <- enc2native(string)

@jaehc
Copy link
Author

jaehc commented Jul 20, 2015

@sun-rui
I was able to reproduce the same error as the testcase's with

$) export LC_ALL=C
$) ./run-tests.sh

It needs to call rawToChar(), R gives an error message below otherwise.

functions on binary files : Error in `Encoding<-`(`*tmp*`, value = "UTF-8") :
  a character vector argument expected
Calls: test_package ... readTypedObject -> getJobj -> jobj -> readString -> Encoding<-

@sun-rui
Copy link
Contributor

sun-rui commented Jul 20, 2015

yeah, rawToChar() is needed. Then does it work now?

@jaehc
Copy link
Author

jaehc commented Jul 20, 2015

Unfortunately, not.
I guess the string in the testcase should be changed to be in a native form.

readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  string <- rawToChar(raw)
  Encoding(string) <- "UTF-8"
  enc2native(string)
}
1. Failure (at test_sparkSQL.R#432): collect() support Unicode characters ------
rdf$name[1] not equal to "\354\225\210\353\205\225\355\225\230\354\204\270\354\232\224"
1 string mismatches:
x[1]: "\354\225\210\353\205\225\355\225\230\354\204\270\354\232\224"
y[1]: "<U+C548><U+B155><U+D558><U+C138><U+C694>"

2. Failure (at test_sparkSQL.R#433): collect() support Unicode characters ------
rdf$name[2] not equal to "\346\202\250\345\245\275"
1 string mismatches:
x[1]: "\346\202\250\345\245\275"
y[1]: "<U+60A8><U+597D>"

3. Failure (at test_sparkSQL.R#434): collect() support Unicode characters ------
rdf$name[3] not equal to "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257"
1 string mismatches:
x[1]: "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257"
y[1]: "<U+3053><U+3093><U+306B><U+3061><U+306F>"

4. Failure (at test_sparkSQL.R#435): collect() support Unicode characters ------
rdf$name[4] not equal to "Xin ch\303\240o"
1 string mismatches:
x[1]: "Xin ch\303\240o"
y[1]: "Xin ch<U+00E0>o"

5. Error: collect() support Unicode characters ---------------------------------
Unsupported type for deserialization
1: withCallingHandlers(eval(code, new_test_environment), error = capture_calls, message = function(c) invokeRestart("muffleMessage"),
       warning = function(c) invokeRestart("muffleWarning"))
2: eval(code, new_test_environment)
3: eval(expr, envir, enclos)
4: expect_equal(collect(where(df2, df2$name == "\346\202\250\345\245\275"))$name, "\346\202\250\345\245\275") at test_sparkSQL.R:438
5: expect_that(object, equals(expected, label = expected.label, ...), info = info, label = label)
6: condition(object)
7: compare(expected, actual, ...)
8: compare.character(expected, actual, ...)
9: identical(x, y)
10: collect(where(df2, df2$name == "\346\202\250\345\245\275"))
11: collect(where(df2, df2$name == "\346\202\250\345\245\275"))
12: .local(x, ...)
13: lapply(listCols, function(col) {
       objRaw <- rawConnection(col)
       numRows <- readInt(objRaw)
       col <- readCol(objRaw, numRows)
       close(objRaw)
       col
   })
14: lapply(listCols, function(col) {
       objRaw <- rawConnection(col)
       numRows <- readInt(objRaw)
       col <- readCol(objRaw, numRows)
       close(objRaw)
       col
   })
15: FUN(X[[i]], ...)
16: readCol(objRaw, numRows)
17: do.call(c, lapply(1:numRows, function(x) {
       value <- readObject(inputCon)
       if (is.null(value))
           NA
       else value
   }))
18: lapply(1:numRows, function(x) {
       value <- readObject(inputCon)
       if (is.null(value))
           NA
       else value
   })
19: lapply(1:numRows, function(x) {
       value <- readObject(inputCon)
       if (is.null(value))
           NA
       else value
   })
20: FUN(X[[i]], ...)
21: readObject(inputCon)
22: readTypedObject(con, type)
23: stop(paste("Unsupported type for deserialization", type))
24: .handleSimpleError(function (e)
   {
       e$calls <- head(sys.calls()[-seq_len(frame + 7)], -2)
       signalCondition(e)
   }, "Unsupported type for deserialization ", quote(readTypedObject(con, type)))
Error: Test failures

@sun-rui
Copy link
Contributor

sun-rui commented Jul 21, 2015

Could you try adding a zero as done previously in writeString():

val utf8 = value.getBytes("UTF-8")
val len = utf8.length
out.writeInt(len + 1)
out.write(utf8, 0, len)
out.writeByte(0)

For those unicode strings in the test case, not sure if need to force the encoding of them to be "UTF-8" before writing them to a JSON file. Seems not necessary.

@jaehc
Copy link
Author

jaehc commented Jul 23, 2015

Adding a zero doesn't solve the problem either. It seems to have the same effect as the one without it.
But, the below works in which the locale is "C", which (I guess) caused the jenkins build failure, not UTF-8.
I set test strings' encoding to UTF-8 and convert them to the native form with enc2native(). The testcase passed.
Also, It is not necessary to use the convertToNative() under UTF-8 locale. The testcase passed without it.

readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  string <- rawToChar(raw)
  Encoding(string) <- "UTF-8"
  enc2native(string)
}
convertToNative <- function(s) {
  Encoding(s) <- "UTF-8"
  enc2native(s)
}

test_that("collect() support Unicode characters", {
  lines <- c("{\"name\":\"안녕하세요\"}",
                 "{\"name\":\"您好\", \"age\":30}",
                 "{\"name\":\"こんにちは\", \"age\":19}",
                 "{\"name\":\"Xin chào\"}")
  jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
  writeLines(lines, jsonPath)

  df <- read.df(sqlContext, jsonPath, "json")
  rdf <- collect(df)
  print(head(rdf))
  print(convertToNative("안녕하세요"))
  expect_true(is.data.frame(rdf))
  expect_equal(rdf$name[1], convertToNative("안녕하세요"))
  expect_equal(rdf$name[2], convertToNative("您好"))
  expect_equal(rdf$name[3], convertToNative("こんにちは"))
  expect_equal(rdf$name[4], convertToNative("Xin chào"))

  df1 <- createDataFrame(sqlContext, rdf)
  print(head(df1))
  expect_equal(collect(where(df1, df1$name == convertToNative("您好")))$name, convertToNative("您好"))
})

test the code under "LC_ALL=C"

$) export LC_ALL=C
$) ./run-tests.sh
SparkSQL functions :   age                                     name
1  NA <U+C548><U+B155><U+D558><U+C138><U+C694>
2  30                         <U+60A8><U+597D>
3  19 <U+3053><U+3093><U+306B><U+3061><U+306F>
4  NA                          Xin ch<U+00E0>o
[1] "<U+C548><U+B155><U+D558><U+C138><U+C694>"
.....  age                                     name
1  NA <U+C548><U+B155><U+D558><U+C138><U+C694>
2  30                         <U+60A8><U+597D>
3  19 <U+3053><U+3093><U+306B><U+3061><U+306F>
4  NA                          Xin ch<U+00E0>o
.

DONE
Tests passed.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38304 has finished for PR 7494 at commit bc469d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jaehc
Copy link
Author

jaehc commented Jul 24, 2015

good job, jenkins.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect I made a mistake here. It semms that enc2native just change the encoding mark, but does not convert the character from current encoding to the native encoding (iconv() can do this). As described in https://cran.r-project.org/doc/manuals/r-release/R-ints.html#Encodings-for-CHARSXPs, R can encodings other than that of the current locale, and most of the character manipulation functions now preserve UTF-8 encodings, we can keep the string encoding as "UTF-8". So could you try to remove this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Perserving UTF-8 encodings sounds much better.
Do you mean that enc2native should be removed like the below?

readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  string <- rawToChar(raw)
  Encoding(string) <- "UTF-8"
  string
}

But, this makes an error in calling collect() following createDataFrame(), which converts a local R dataframe to spark RDD Dataframe.

5. Error: collect() support Unicode characters ---------------------------------
unused argument (connection = NULL)
1: withCallingHandlers(eval(code, new_test_environment), error = capture_calls, message = function(c) invokeRestart("muffleMessage"),
       warning = function(c) invokeRestart("muffleWarning"))
2: eval(code, new_test_environment)
3: eval(expr, envir, enclos)
4: createDataFrame(sqlContext, rdf) at test_sparkSQL.R:65
5: parallelize(sc, data)
6: lapply(slices, writeObject, connection = NULL)
7: lapply(slices, writeObject, connection = NULL)
8: FUN(X[[i]], ...)
9: .handleSimpleError(function (e)
   {
       e$calls <- head(sys.calls()[-seq_len(frame + 7)], -2)
       signalCondition(e)
   }, "unused argument (connection = NULL)", quote(FUN(X[[i]], ...)))
Error: Test failures

I tried to find out the reason and I guess the MSB of a string is set when I use Encoding(string)<-"UTF-8", which is not otherwise.

I printed out serializedSlices in paralleize(), line 132. The result is like the below.
context.R

102 parallelize <- function(sc, coll, numSlices = 1) {
103   # TODO: bound/safeguard numSlices
104   # TODO: unit tests for if the split works for all primitives
105   # TODO: support matrix, data frame, etc
106   if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
107     if (is.data.frame(coll)) {
108       message(paste("context.R: A data frame is parallelized by columns."))
109     } else {
110       if (is.matrix(coll)) {
111         message(paste("context.R: A matrix is parallelized by elements."))
112       } else {
113         message(paste("context.R: parallelize() currently only supports lists and vectors.",
114                       "Calling as.list() to coerce coll into a list."))
115       }
116     }
117     coll <- as.list(coll)
118   }
119
120   print(coll)
121
122   if (numSlices > length(coll))
123     numSlices <- length(coll)
124
125   sliceLen <- ceiling(length(coll) / numSlices)
126   slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
127
128   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
129   # 2-tuples of raws
130   serializedSlices <- lapply(slices, serialize, connection = NULL)
131
132   print(serializedSlices)
133   jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
134                       "createRDDFromArray", sc, serializedSlices)
135
136   RDD(jrdd, "byte")
137 }

Case 1. with Encoding(string) <- "UTF-8"
[1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 13 00 00 00 04 00 00 00
[26] 13 00 00 00 02 00 00 00 0e 00 00 00 01 7f f0 00 00 00 00 07 a2 00 00 00 10
[51] 00 00 00 01 00 00 80 09 00 00 00 0f ec 95 88 eb 85 95 ed 95 98 ec 84 b8 ec
[76] 9a 94 00 00 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 40 3e 00 00 00 00 00
[101] 00 00 00 00 10 00 00 00 01 00 00 80 09 00 00 00 06 e6 82 a8 e5 a5 bd 00 00
[126] 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 40 33 00 00 00 00 00 00 00 00 00
[151] 10 00 00 00 01 00 00 80 09 00 00 00 0f e3 81 93 e3 82 93 e3 81 ab e3 81 a1
[176] e3 81 af 00 00 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 7f f0 00 00 00 00
[201] 07 a2 00 00 00 10 00 00 00 01 00 00 80 09 00 00 00 09 58 69 6e 20 63 68 c3
[226] a0 6f

Case 2. without Encoding(string) <- "UTF-8"
[1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 13 00 00 00 04 00 00 00
[26] 13 00 00 00 02 00 00 00 0e 00 00 00 01 7f f0 00 00 00 00 07 a2 00 00 00 10
[51] 00 00 00 01 00 00 00 09 00 00 00 0f ec 95 88 eb 85 95 ed 95 98 ec 84 b8 ec
[76] 9a 94 00 00 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 40 3e 00 00 00 00 00
[101] 00 00 00 00 10 00 00 00 01 00 00 00 09 00 00 00 06 e6 82 a8 e5 a5 bd 00 00
[126] 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 40 33 00 00 00 00 00 00 00 00 00
[151] 10 00 00 00 01 00 00 00 09 00 00 00 0f e3 81 93 e3 82 93 e3 81 ab e3 81 a1
[176] e3 81 af 00 00 00 13 00 00 00 02 00 00 00 0e 00 00 00 01 7f f0 00 00 00 00
[201] 07 a2 00 00 00 10 00 00 00 01 00 00 00 09 00 00 00 09 58 69 6e 20 63 68 c3
[226] a0 6f

You can see [51], [101], [151], [201] are different. There is a leading 80 before 09 with Encoding() which, I guess, makes an error.
I think this is the encoding indication bit in R according to the link you gave.

@shivaram
Copy link
Contributor

@sun-rui @CHOIJAEHONG1 I took a look at this and it looks like the right way to do this would be to use iconv and not use rawToChar which doesn't parse UTF-8 correctly. I tried the following diff and it seems to pass the unit tests.

 raw <- readBin(con, raw(), stringLen, endian = "big")
 iconv(list(raw), to="UTF-8")

@jaehc
Copy link
Author

jaehc commented Aug 2, 2015

@shivaram
The testcase passed under the UTF-8 locale(e.g. LC_ALL=ko_KR.UTF-8). But, unfortunately, iconv() returns NA under "LC_ALL=C" which makes the testcase fail. It seems that iconv() works differently under the different locales.

@shivaram
Copy link
Contributor

shivaram commented Aug 2, 2015

I'm not sure how why expect the tests to pass with LC_ALL=C. In that case I am not even able to enter the characters into the console (i.e. parse or scan in R won't parse the UTF-8 characters ?) -- are we sure the input files are getting created correctly in this case ?

@jaehc
Copy link
Author

jaehc commented Aug 4, 2015

Firstly, I am not able to enter multibyte character sequences in the R shell under LC_ALL=C either. I seems that R does not support it. What I am trying to do is enabling UTF-8 characters to be used in R, which are sent from java, no matter what locale a user has.
Basically, the testcase passed in my local UTF-8 machine, too. But, it failed in Jenkins unfortunately. (I guess its locale is not UTF-8). So I started to find out the way in which R supports and also perseves UTF-8 characters in various locales, e.g. LC_ALL=C like sun-rui suggested.

@shivaram
Copy link
Contributor

shivaram commented Aug 4, 2015

I see - so the problem here is on how to write a unit test that uses UTF-8 and works with LC_ALL=C ? One simple thing we might be able to do is to set the locale inside the unit test with Sys.setlocale("LC_ALL", "en_US.UTF-8") -- Can you see if this works ? I think the Jenkins machines have a recent enough OS that it must support UTF-8

@jaehc
Copy link
Author

jaehc commented Aug 6, 2015

@shivaram
Yes, You're right. I want the testcase pass with LC_ALL=C and R support and preserve UTF-8 strings.

The testcase passed with Sys.setlocale("LC_ALL", "en_US.UTF-8"). But, I had to modify context.R to clear out the UTF-8 indicating bit. It seems to turn out that the UTF-8 indicating bit in R with Encoding(x) <- "UTF-8" causes an error in passing a string to Spark.
I hope the code blow support and preserve UTF-8 encoding in R under LC_ALL=C.

context.R

127   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
128   # 2-tuples of raws
129   removeUtf8EncodingBit <- function(s) {
130     Encoding(s) <- "bytes"
131     s
132   }
133   slices_ <- rapply(slices, function(x) ifelse(is.character(x), removeUtf8EncodingBit(x), x), how="list")
134   serializedSlices <- lapply(slices_, serialize, connection = NULL)
135
136   jrdd <- callJStatic("org.apache.spark.api.r.RRDD",
137                       "createRDDFromArray", sc, serializedSlices)
138
139   RDD(jrdd, "byte")
140 }

deserializer.R

readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  string <- rawToChar(raw)
  Encoding(string) <- "UTF-8"
  string
}

testcase

test_that("collect() support Unicode characters", {
  convertToUtf8 <- function(s) {
    Encoding(s) <- "UTF-8"
    s
  }
  Sys.setlocale("LC_ALL", "en_US.UTF-8")

  lines <- c("{\"name\":\"안녕하세요\"}",
             "{\"name\":\"您好\", \"age\":30}",
             "{\"name\":\"こんにちは\", \"age\":19}",
             "{\"name\":\"Xin chào\"}")
  jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
  writeLines(lines, jsonPath)

  df <- read.df(sqlContext, jsonPath, "json")
  rdf <- collect(df)
  expect_true(is.data.frame(rdf))
  expect_equal(rdf$name[1], convertToUtf8("안녕하세요"))
  expect_equal(rdf$name[2], convertToUtf8("您好"))
  expect_equal(rdf$name[3], convertToUtf8("こんにちは"))
  expect_equal(rdf$name[4], convertToUtf8("Xin chào"))

  df1 <- createDataFrame(sqlContext, rdf)
  expect_equal(collect(where(df1, df1$name == "您好"))$name, convertToUtf8("您好"))
})

@shivaram
Copy link
Contributor

shivaram commented Aug 6, 2015

Can we still use the iconv solution with the System.setlocale in the test case ? It doesn't seem right to use rawToChar when we are decoding UTF-8 strings

@jaehc
Copy link
Author

jaehc commented Aug 9, 2015

Okay, using iconv works nicely. But, It still needs to modify context.R to get rid of the leading UTF-8 indicating bit in a string before sending it to Spark though.
However, Do we need to consider using Sys.setlocale() to set the locale configuration back to that of before the testcase because of the side effect? or Is there any other ways to handle it?

Also, I think introducing Sys.setlocale("LC_ALL", "en_US.UTF-8") in the testcase implies that SparkR will support multibyte character sequences only with UTF-8 locale. Users should call Sys.setlocale() if their locale is not UTF-8.

deserialize.R

readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  iconv(list(raw), to="UTF-8")
}

context.R

124   sliceLen <- ceiling(length(coll) / numSlices)
125   slices <- split(coll, rep(1:(numSlices + 1), each = sliceLen)[1:length(coll)])
126
127   # Remove the leading UTF-8 indicating bit
128   removeUtf8EncodingBit <- function(s) {
129     Encoding(s) <- "bytes"
130     s
131   }
132   slices_ <- rapply(slices, function(x) ifelse(is.character(x), removeUtf8EncodingBit(x), x), how="list")
133
134   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
135   # 2-tuples of raws
136   serializedSlices <- lapply(slices_, serialize, connection = NULL)
test_that("collect() support Unicode characters", {
  locale <- Sys.getlocale()
  Sys.setlocale("LC_ALL", "en_US.UTF-8")

  lines <- c("{\"name\":\"안녕하세요\"}",
             "{\"name\":\"您好\", \"age\":30}",
             "{\"name\":\"こんにちは\", \"age\":19}",
             "{\"name\":\"Xin chào\"}")
  jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
  writeLines(lines, jsonPath)

  df <- read.df(sqlContext, jsonPath, "json")
  rdf <- collect(df)
  expect_true(is.data.frame(rdf))
  expect_equal(rdf$name[1], "안녕하세요")
  expect_equal(rdf$name[2], "您好")
  expect_equal(rdf$name[3], "こんにちは")
  expect_equal(rdf$name[4], "Xin chào")

  df1 <- createDataFrame(sqlContext, rdf)
  expect_equal(collect(where(df1, df1$name == "您好"))$name, "您好")
  Sys.setlocale("LC_ALL", locale)
})

@shivaram
Copy link
Contributor

shivaram commented Aug 9, 2015

@CHOIJAEHONG1 This diff is looking good. Can you update the PR with this ? BTW why do we need to clear the encoding bit in context.R ? Does the serialization not work correctly if it is not cleared ?

@jaehc
Copy link
Author

jaehc commented Aug 11, 2015

@shivaram

The testcase fails in collect() following createDataFrame() If we don't remove the encoding bit.
This is because worker.R reponses with a wrong string result under the condition of LC_ALL=C and the encoding bit being set.
Suppose a string's encoding bit is set and its dataframe is loaded into Spark RDD with parallelize() together with the encoding bit. When I call collect(), worker.R response with a strange byte presentation and wrong array length.
I guess we should have worker.R send UTF-8 strings only and not try escape strings no matter what locale is if we want to get rid of the code that removes the encoding bit.

For instance,
If I put (ea b0 80 in UTF-8 little endian) into RDD with the encoding bit being set and collect(), worker.R response with 3c 55 2b 41 43 30 30 3e, which is <U+AC00> according to the ascii code table and its length of 4, not 8. <U+AC00> is indeed in the unicode character set.
When worker.R receives a string of the UTF-8 encoding bit being set and its locale is LC_ALL=C, it turns the string into <U+XXXX> form and the length does not match with the actual byte representation. I think worker.R escape multibyte character sequences of the UTF-8 encoding bit being set using <U+XXXX> when its locale is LC_ALL=C. Sys.setlocale() in the testcase might not have effect to worker.R bevause of fork()

But, After removing the encoding bit by Encoding(s)<-"bytes", worker.R responds with strings in UTF-8 encoding intact. Also, with LC_ALL=en_US.UTF-8 worker.R dose not try to escape a string, so it works and the testcase passes.

The following log message is from target/R/unit-test.log when the testcase failed. The failure reason is the size mismatch of bytes sent from worker.R. Spark read 3c 55 2b 41 from 3c 55 2b 41 43 30 30 3e because the returned length of 4 for the string from worker.R. 3c 55 2b 41 is not null terminated so a assertion checking it fails in SerDe.scala
unit-test.log

Executor task launch worker-0 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:165)
    at org.apache.spark.api.r.SerDe$.readStringBytes(SerDe.scala:100)
    at org.apache.spark.api.r.SerDe$.readString(SerDe.scala:110)
    at org.apache.spark.api.r.SerDe$.readTypedObject(SerDe.scala:64)
    at org.apache.spark.api.r.SerDe$.readObject(SerDe.scala:52)
    at org.apache.spark.sql.api.r.SQLUtils$$anonfun$org$apache$spark$sql$api$r$SQLUtils$$bytesToRow$2.apply(SQLUtils.scala:104)
    at org.apache.spark.sql.api.r.SQLUtils$$anonfun$org$apache$spark$sql$api$r$SQLUtils$$bytesToRow$2.apply(SQLUtils.scala:103)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.Range.foreach(Range.scala:141)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.api.r.SQLUtils$.org$apache$spark$sql$api$r$SQLUtils$$bytesToRow(SQLUtils.scala:103)
    at org.apache.spark.sql.api.r.SQLUtils$$anonfun$1.apply(SQLUtils.scala:77)
    at org.apache.spark.sql.api.r.SQLUtils$$anonfun$1.apply(SQLUtils.scala:77)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

I test how R strings work with different Encoding() and locale.
For example, declare a variable with a string "가"

> a<-"가"

With LC_ALL=C and `Encoding(a)<-"bytes"

> Sys.setlocale("LC_ALL", "C")
[1] "C/C/C/C/C/ko_KR.UTF-8"
> Encoding(a) <- "bytes"
> serialize(a, connection=NULL)
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 20
[26] 09 00 00 00 03 ea b0 80
> a
[1] "\\xea\\xb0\\x80"

with LC_ALL=C and Encoding(a)<-"UTF8"

> Encoding(a) <- "UTF-8"
> serialize(a, connection=NULL)
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 80
[26] 09 00 00 00 03 ea b0 80
> a
[1] "<U+AC00>"

with LC_ALL=UTF-8 and Encoding(a)<-"byte"

> Sys.setlocale("LC_ALL", "ko_KR.UTF-8")
[1] "ko_KR.UTF-8/ko_KR.UTF-8/ko_KR.UTF-8/C/ko_KR.UTF-8/ko_KR.UTF-8"
> Encoding(a) <- "bytes"
> serialize(a, connection=NULL)
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 20
[26] 09 00 00 00 03 ea b0 80
> a
[1] "\\xea\\xb0\\x80"

with LC_ALL=UTF-8 and Encoding(a)<-"UTF-8"

> Encoding(a) <- "UTF-8"
> serialize(a, connection=NULL)
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 80
[26] 09 00 00 00 03 ea b0 80
> a
[1] "가"

@shivaram
Copy link
Contributor

Yeah I think the right thing to do is to make sure worker.R responds with UTF-8 strings always. Given the discussion in this thread I think the problem is that we use enc2utf8 in

utfVal <- enc2utf8(value)
and instead we should use iconv there as well

@sun-rui
Copy link
Contributor

sun-rui commented Aug 13, 2015

@CHOIJAEHONG1, @shivaram:

  1. R worker can be in any locale, because R can recognize UTF-8 and preserve UTF-8 encoding when manipulating strings. The root cause of the issue is related to the different behavior of writeBin when writing a string in different locales.

for example, x<-"\uac00":

when in UTF-8 locale, the result of writeBin(x, con) is
    ea b0 80 00
while in C locale,  the result of writeBin(x, con) is
    `<U+AC00>`

I suspect this is a bug of writeBin, because it is expected that the internal representation of the string be written, instead of its display representation.

We can fix the this issue be changing:

writeString <- function(con, value) {
  utfVal <- enc2utf8(value)
  rawString <- charToRaw(vtfVal)
  writeInt(con, as.integer(length(rawString)))
  writeBin(rawString , con, endian = "big")
}

readString() should be changed accordingly, as there is no trailing zero.

  1. I prefer not to change current locale in the test case. As Windows does not support UTF locale.We can use unicode escapings in the unicode strings. Thus I guess we don't need to change locale. R supports escaping unicode characters as described in https://stat.ethz.ch/R-manual/R-devel/library/base/html/Quotes.html
  2. No necessary to use iconv. Encoding(x) <- "UTF-8" in deserializer , and enc2utf8 in serializer are OK.

@sun-rui
Copy link
Contributor

sun-rui commented Aug 26, 2015

@CHOIJAEHONG1 , any update on this?

@jaehc
Copy link
Author

jaehc commented Aug 26, 2015

Sorry for being late.

I got this error message in SerDe.scala. The byte sequence sent from worker.R is not null-terminated.

java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:165)
    at org.apache.spark.api.r.SerDe$.readStringBytes(SerDe.scala:100)
    at org.apache.spark.api.r.SerDe$.readString(SerDe.scala:110)
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:49)
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:37)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)
[1] "returnStatus"
integer(0)
Error in readTypedObject(con, type) :
  Unsupported type for deserialization
Calls: test_package ... callJStatic -> invokeJava -> readObject -> readTypedObject
SerDe.scala
  def readStringBytes(in: DataInputStream, len: Int): String = {
    val bytes = new Array[Byte](len)
    in.readFully(bytes)
    println("bytes is " + bytes.map("%02x".format(_)).mkString(" "))
    assert(bytes(len - 1) == 0)
    val str = new String(bytes.dropRight(1), "UTF-8")
    str
  }

The output.

bytes is 6f 72 67 2e 61 70 61 63 68 65 2e 73 70 61 72 6b 2e 61 70 69 2e 72 2e 52 52 44 44

@jaehc
Copy link
Author

jaehc commented Aug 26, 2015

@sun-rui

I call writeBin with useBytes=TRUE, which is default to FALSE, refering to the below.

https://stat.ethz.ch/R-manual/R-devel/library/base/html/writeLines.html

useBytes is for expert use. Normally (when false) character strings with marked encodings are >converted to the current encoding before being passed to the connection (which might do further re->encoding). useBytes = TRUE suppresses the re-encoding of marked strings so they are passed byte->by-byte to the connection: this can be useful when strings have already been re-encoded by e.g. iconv. >(It is invoked automatically for strings with marked encoding "bytes".)

It seems to work.
And calling markUtf8 is not necessary in the testcase when I set the locale to UTF-8.

@shivaram
Do you agree on using rawToChar? It appears to be Okay to use here.

writeString <- function(con, value) {
  utfVal <- enc2utf8(value)
  writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
  writeBin(utfVal, con, endian = "big", useBytes=TRUE)
}
readString <- function(con) {
  stringLen <- readInt(con)
  raw <- readBin(con, raw(), stringLen, endian = "big")
  string <- rawToChar(raw)
  Encoding(string) <- "UTF-8"
  string
}
test_that("collect() support Unicode characters", {
  markUtf8 <- function(s) {
    Encoding(s) <- "UTF-8"
    s
  }

  lines <- c("{\"name\":\"안녕하세요\"}",
             "{\"name\":\"您好\", \"age\":30}",
             "{\"name\":\"こんにちは\", \"age\":19}",
             "{\"name\":\"Xin chào\"}")

  jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
  writeLines(lines, jsonPath)

  df <- read.df(sqlContext, jsonPath, "json")
  rdf <- collect(df)
  expect_true(is.data.frame(rdf))
  expect_equal(rdf$name[1], markUtf8("안녕하세요"))
  expect_equal(rdf$name[2], markUtf8("您好"))
  expect_equal(rdf$name[3], markUtf8("こんにちは"))
  expect_equal(rdf$name[4], markUtf8("Xin chào"))

  df1 <- createDataFrame(sqlContext, rdf)
  expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
})

@jaehc
Copy link
Author

jaehc commented Aug 26, 2015

Timeout occurred while fetching from the origin.

> git config remote.origin.url https://github.com/apache/spark.git # timeout=10
Fetching upstream changes from https://github.com/apache/spark.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/spark.git +refs/pull/7494/*:refs/remotes/origin/pr/7494/* # timeout=15
ERROR: Timeout after 15 minutes
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from https://github.com/apache/spark.git
    at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:735)
    at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:983)
    at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1016)
    at hudson.scm.SCM.checkout(SCM.java:485)
    at hudson.model.AbstractProject.checkout(AbstractProject.java:1282)
    at 

@shivaram
Copy link
Contributor

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Aug 26, 2015

Test build #41629 has finished for PR 7494 at commit 3686f15.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a little bit confused about the behavior of treating unicode string in non-UTF8 local. Why no need to makeUtf8 for these unicode strings?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is related to the R interpreter. I made an example to see how the R interpreter handle unicode strings as to the UTF-8 indicator with different locales.
You can see R adds 80(maybe, the UTF-8 indicator) before 09 with the UTF-8 locale automatically. I think this example could explain the reason why the testcase passes under the UTF-8 locale without using markUtf8.

test.R

a<-"가"
Encoding(a)
print(serialize(a, connection=NULL))

with UTF-8 locale the output is

$ r -f test.R
> a<-"가"
> Encoding(a)
[1] "UTF-8"
> print(serialize(a, connection=NULL))
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 80
[26] 09 00 00 00 03 ea b0 80

with C(ascii) locale the output is

$ r -f test.R
> a<-"가"
> Encoding(a)
[1] "unknown"
> print(serialize(a, connection=NULL))
 [1] 58 0a 00 00 00 02 00 03 02 00 00 02 03 00 00 00 00 10 00 00 00 01 00 00 00
[26] 09 00 00 00 03 ea b0 80

@sun-rui
Copy link
Contributor

sun-rui commented Aug 27, 2015

@CHOIJAEHONG1 , basically LGTM. Some minor comment.

@shivaram
Copy link
Contributor

Thanks @CHOIJAEHONG1 and @sun-rui -- I just want to test this / go through this a bit carefully once more as its a pretty fundamental change in how we handle strings. Will take another look at this soon

@jaehc
Copy link
Author

jaehc commented Aug 29, 2015

@shivaram, @sun-rui
Thanks you guys for the work. It would be great to go through more like you said.

@shivaram
Copy link
Contributor

shivaram commented Sep 3, 2015

@CHOIJAEHONG1 Sorry for the delay. I finally got a chance to try this out on my machine and it seems to work fine with UTF-8 strings with both the locale set to C and UTF-8.

I'm going to merge this to master branch and as this is early in the 1.6 window we also get a chance for this to get tested thoroughly.

@asfgit asfgit closed this in af0e312 Sep 3, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants