Skip to content

Conversation

@zero323
Copy link
Member

@zero323 zero323 commented Oct 13, 2015

Use dropFactors column-wise instead of nested loop when createDataFrame from a data.frame

At this moment SparkR createDataFrame is using nested loop to convert factors to character when called on a local data.frame. It works but is incredibly slow especially with data.table (~ 2 orders of magnitude compared to PySpark / Pandas version on a DateFrame of size 1M rows x 2 columns).

A simple improvement is to apply dropFactorcolumn-wise and then reshape output list.

It should at least partially address SPARK-8277.

@shivaram
Copy link
Contributor

Jenkins, ok to test

@shivaram
Copy link
Contributor

@zero323 Do you have any benchmark results before this PR vs. after this PR ?

@zero323
Copy link
Member Author

zero323 commented Oct 13, 2015

@shivaram

Spark 1.5.1

df <- read.csv("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=3)

## Unit: seconds
##                             expr      min       lq     mean   median       uq
##  createDataFrame(sqlContext, df) 96.41565 97.19515 99.08441 97.97465 100.4188
##       max neval
##  102.8629     3

After patch on master:

df <- read.csv("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=5)

## Unit: seconds
##                           expr      min       lq     mean   median       uq
## createDataFrame(sqlContext, df) 5.366246 5.862174 6.353852 6.033105 6.156628
##      max neval
##  8.351105     5

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43654 has finished for PR 9099 at commit 91f1bb3.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43656 has finished for PR 9099 at commit 8b63ee1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@felixcheung
Copy link
Member

very cool, could you run a benchmark on data.table dataset too (read.csv returns data.frame)?

@shivaram
Copy link
Contributor

cc @sun-rui

@zero323
Copy link
Member Author

zero323 commented Oct 13, 2015

@felixcheung Here you are:

Spark 1.5.1

dt <- data.table::fread("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3)

## Unit: seconds
##                             expr      min       lq     mean  median       uq
##  createDataFrame(sqlContext, dt) 378.8534 379.4482 381.2061 380.043 382.3825
##      max neval
##  384.722     3

Patched master:

dt <- data.table::fread("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3)

## Unit: seconds
##                             expr      min       lq     mean   median       uq
##  createDataFrame(sqlContext, dt) 5.196731 5.512982 5.674607 5.544245 5.959565
##       max neval
##  6.159512     5

It looks a little bit to good to be true, but as far as I can tell everything works as expected.

@felixcheung
Copy link
Member

that's good. data.table is very fast to begin with...

@felixcheung
Copy link
Member

could you add some tests specifically to make sure iris or some other known/available dataset is "serialized" properly?

@zero323
Copy link
Member Author

zero323 commented Oct 13, 2015

Sure. Should I make a separate test for that or simply add to create DataFrame from list or data.frame.

@felixcheung
Copy link
Member

you could probably add to that. this is just an extra tests to be safe (and should check for values too, the current tests don't seem to do that, only col names, data types and counts)

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43684 has finished for PR 9099 at commit 36e191d.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shivaram
Copy link
Contributor

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Oct 13, 2015

Test build #43686 has finished for PR 9099 at commit 36e191d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ChildProcAppHandle implements SparkAppHandle
    • abstract class LauncherConnection implements Closeable, Runnable
    • final class LauncherProtocol
    • static class Message implements Serializable
    • static class Hello extends Message
    • static class SetAppId extends Message
    • static class SetState extends Message
    • static class Stop extends Message
    • class LauncherServer implements Closeable
    • class NamedThreadFactory implements ThreadFactory
    • class OutputRedirector

@zero323
Copy link
Member Author

zero323 commented Oct 14, 2015

@shivaram, @felixcheung

OK, I am puzzled here. I've played with different test scenarios and this PR is either buggy, fixes unreported bug or exposes a more serious problem. Let's say we have following local data frame:

ldf <- structure(list(foo = list(structure(list(a = 1, b = 3), .Names = c("a", 
    "b")), structure(list(a = 2, c = 6), .Names = c("a", "c"))), 
    bar = c(1, 2), baz = c("a", "b")), .Names = c("foo", "bar", 
    "baz"), row.names = c("1", "2"), class = "data.frame")

ldf

##    foo bar baz
## 1 1, 3   1   a
## 2 2, 6   2   b

str(ldf)

## 'data.frame':    2 obs. of  3 variables:
##  $ foo:List of 2
##   ..$ :List of 2
##   .. ..$ a: num 1
##   .. ..$ b: num 3
##   ..$ :List of 2
##   .. ..$ a: num 2
##   .. ..$ c: num 6
##  $ bar: num  1 2
##  $ baz: chr  "a" "b"

On 1.5.1 an attempt of converting this to Spark DataFrame fails with following error:

sdf <- createDataFrame(sqlContext, ldf)

## Error in structField.character(names[[i]], types[[i]], TRUE) : 
##   Field type must be a string.

while patched version creates a relatively reasonable schema:

sdf <- createDataFrame(sqlContext, ldf)
printSchema(sdf)

## root
##  |-- foo: array (nullable = true)
##  |    |-- element: double (containsNull = true)
##  |-- bar: double (nullable = true)
##  |-- baz: string (nullable = true)

I believe that patched behavior is what we want here but as far as I can tell it is neither covered by tests or docs. Still, after transformation on 1.5.1 data[[1]][[1]] is a list of lists (list(structure(list(a = 1, b = 3), .Names = c("a", "b"))) while patched version is simply a list (structure(list(a = 1, b = 3), .Names = c("a", "b"))).

I admit an input df is not the most typical R data.frame nevertheless I believe it should be either accepted as a valid input or intentionally marked as invalid before it gets to structField.

Finally it looks like it should actually give StructType or MapType ([String, Any]?) not an array. Moreover, there is a type issue here when lists are heterogeneous.

Please ignore that. I've checked source once again and I've found type mapping. Question remains if it should fail or not. As far I can tell this problem affects at least 1.4.1, 1.5.0, and 1.5.1.

@shivaram
Copy link
Contributor

@sun-rui -- Could this be related to the StructType change ?

@felixcheung
Copy link
Member

it does look like except for a 2-3 lines the createDataFrame function has not changed since 1.4.0
https://github.com/apache/spark/blob/branch-1.4/R/pkg/R/SQLContext.R#L86
so it's possible that this bug exists since then. though most people are passing data.frame to this so it might not have been exercised as much

@felixcheung
Copy link
Member

could you add this test for this structure?

@zero323
Copy link
Member Author

zero323 commented Oct 15, 2015

It is there from 1.4.0.

Regarding tests I would prefer to wait until I get some clarification, because right now I am not sure how to handle this. If expected mapping is list -> array then it simply doesn't cover situations like this:

ldf1 <- structure(list(foo = list(
    structure(list(foo = "a_foo", bar = 3), .Names = c("foo", "bar")),
    structure(list(foo = 2, bar = 3), .Names = c("foo", "bar"))
)), .Names = "foo", class = "data.frame", row.names = c("1", "2"))

sdf1 <- createDataFrame(sqlContext, ldf1)

printSchema(sdf1)

## root
##  |-- foo: array (nullable = true)
##  |    |-- element: string (containsNull = true)

dtypes(sdf1)

## [[1]]
## [1] "foo"           "array<string>"

in contrast to:

ldf2 <- structure(list(foo = list(
    structure(list(foo = 1, bar = 3), .Names = c("foo", "bar")),
    structure(list(foo = 2, bar = "a_bar"), .Names = c("foo", "bar"))
)), .Names = "foo", class = "data.frame", row.names = c("1", "2"))

sdf2 <- createDataFrame(sqlContext, ldf2)

printSchema(sdf2)

## root
##  |-- foo: array (nullable = true)
##  |    |-- element: double (containsNull = true)

dtypes(sdf2)

## [[1]]
## [1] "foo"           "array<double>"

What is even more confusing the first will fail on head / collect with scala.MatchError: 3.0 (of class java.lang.Double) but the second one works:

head(sdf2)

##        foo
## 1     1, 3
## 2 2, a_bar

but understandably fails on explode:

head(select(sdf2, explode(sdf2$foo)))

## Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
##   java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double

@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43787 has finished for PR 9099 at commit 08d6dd1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43788 has finished for PR 9099 at commit 40d4d44.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sun-rui
Copy link
Contributor

sun-rui commented Oct 16, 2015

@zero323, I have an alternative proposal. That is as simpler as:
data.frame(t(data))
It seems t() can handle factor types, so we don't need dropFactor. Could you try this proposal, benchmark it and compare the result with that of your version?

For the problem you observed regarding sdf, that is because previously inferring schema of complex types was buggy. That was fixed by my PR for SPARK-10049. The fix is currently on master, not in 1.5.x releases.

For the bug regarding sdf1, I will investigate it (maybe some problem in handling nested array) and fix in another PR. You can go on with this PR.

@zero323
Copy link
Member Author

zero323 commented Oct 16, 2015

@sun-rui It is simple but not correct. Since t results in a matrix there is an automatic type coercion. So for example if you take iris you can expect something like this:

iris_t <- t(iris)
stopifnot(is.character(iris_t), is.matrix(iris_t)) # Type coercion, everything is character
iris_t_df <- data.frame(iris_t)
stopifnot(all(lapply(iris_t_df, class) == "factor")) # Now everything is factor

The second coercion can be handled with stringsAsFactors = FALSE, but the first is the only possible behavior. Situation gets even worse with non-atomic types.

It is also significantly slower:
data <- read.csv("flights.csv")
microbenchmark::microbenchmark(data.frame(t(data)), times=10)

## Unit: seconds
##                 expr      min       lq     mean   median       uq      max
##  data.frame(t(data)) 22.16711 22.99645 23.25316 23.12763 23.62046 24.14375
##  neval
##     10

so I don't think it can be useful here.

Without string to factor conversion it is actually faster, but I still don't see how it can be used here.

> microbenchmark::microbenchmark(data.frame(t(data), stringsAsFactors=F), times=10)
## Unit: seconds
##                                       expr     min       lq     mean   median
##  data.frame(t(data), stringsAsFactors = F) 2.58896 2.675404 2.731266 2.727547
##        uq     max neval
##  2.798114 2.86812    10

@SparkQA
Copy link

SparkQA commented Oct 17, 2015

Test build #43870 has finished for PR 9099 at commit d677380.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zero323
Copy link
Member Author

zero323 commented Oct 17, 2015

I've prepared some tests and it looks like neither heterogeneous lists or environments are properly handled.

@shivaram
Copy link
Contributor

@sun-rui any more comments on this ?

@zero323
Copy link
Member Author

zero323 commented Oct 23, 2015

@sun-rui As far as I can tell this is ready. There is still SPARK-11283, which could be fixed here as well.

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44224 has finished for PR 9099 at commit 36ccdc7.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * # class of POSIXlt is c(\"POSIXlt\" \"POSIXt\")\n

@zero323
Copy link
Member Author

zero323 commented Oct 26, 2015

@shivaram Not yet. I hope I'll have some time later this week to take a deeper look at this. And I still would like to hear some official response regarding SPARK-11283, because right now there is really nothing we can test here.

@shivaram
Copy link
Contributor

@zero323 @sun-rui Can we remove the unit tests for the list / environment stuff from this PR ? From what I understand these problems already exist irrespective of this performance improving change ?

@zero323
Copy link
Member Author

zero323 commented Nov 5, 2015

@shivaram Yes, thats right. These problems already exist on the master. The only problem is I have to mimic current behavior to match what is described in SPARK-11283. I removed tests from this PR but I strongly believe it should be tested.

@SparkQA
Copy link

SparkQA commented Nov 5, 2015

Test build #45085 has finished for PR 9099 at commit 463713d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * # class of POSIXlt is c(\"POSIXlt\" \"POSIXt\")\n

@felixcheung
Copy link
Member

@zero323 you could add the test code to SPARK-11283 so that they could be added back then.

@shivaram
Copy link
Contributor

shivaram commented Nov 5, 2015

@sun-rui Could you take one more look at this ? My idea is to just get a fix for the performance issue in this PR and not change any behavior (for better or for worse).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#' is for Roxygen. so use # here.

@SparkQA
Copy link

SparkQA commented Nov 6, 2015

Test build #45209 has finished for PR 9099 at commit 6a73d5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * # class of POSIXlt is c(\"POSIXlt\" \"POSIXt\")\n

@shivaram
Copy link
Contributor

@sun-rui Could you take another look ? It'll be good to get this into 1.6 if we can

@sun-rui
Copy link
Contributor

sun-rui commented Nov 13, 2015

I will look at it tomorrow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zero323, I don't understand the meaning here. could you give me an example?

An experiment like:

mapply(list, c(1:2), list(list(3,4),list(5,6)), SIMPLIFY=FALSE)

works as expected. The combination of atomic vector and list works fine for mapply.

I would recommend to eliminate this code piece here so that this PR focus on performance improvement only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think you were trying to emulating a behavior of a bug when creating a DataFrame from a data.frame having any column of list type. Actually, this bug is https://issues.apache.org/jira/browse/SPARK-11283 reported by you:)

So this PR not only is a performance improvement, but also fixes SPARK-11283.

please remove code piece here:)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug of SPARK-11283 lies in

lapply(1:m, function(j) { dropFactor(data[i,j]) })

in which data[i,j] returns a list of item instead of item itself when the item is in a column of list type.

Use mapply fixs it!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sun-rui Yes, the only reason to add requires_wrapping was to mimic behavior described in SPARK-11283 and not introduce unexpected changes in behavior. I understand I can safely drop this and add tests assuming that SPARK-11283 is indeed a bug :) If so I'll try to do it before Monday.

@SparkQA
Copy link

SparkQA commented Nov 15, 2015

Test build #45952 has finished for PR 9099 at commit 69fa917.

  • This patch fails R style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * # class of POSIXlt is c(\"POSIXlt\" \"POSIXt\")\n

@SparkQA
Copy link

SparkQA commented Nov 15, 2015

Test build #45953 has finished for PR 9099 at commit 5e9580c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * # class of POSIXlt is c(\"POSIXlt\" \"POSIXt\")\n

@sun-rui
Copy link
Contributor

sun-rui commented Nov 16, 2015

LGTM

@shivaram
Copy link
Contributor

Thanks @zero323 and @sun-rui - Merging this

asfgit pushed a commit that referenced this pull request Nov 16, 2015
…oop when createDataFrame

Use `dropFactors` column-wise instead of nested loop when `createDataFrame` from a `data.frame`

At this moment SparkR createDataFrame is using nested loop to convert factors to character when called on a local data.frame.  It works but is incredibly slow especially with data.table (~ 2 orders of magnitude compared to PySpark / Pandas version on a DateFrame of size 1M rows x 2 columns).

A simple improvement is to apply `dropFactor `column-wise and then reshape output list.

It should at least partially address [SPARK-8277](https://issues.apache.org/jira/browse/SPARK-8277).

Author: zero323 <matthew.szymkiewicz@gmail.com>

Closes #9099 from zero323/SPARK-11086.

(cherry picked from commit d7d9fa0)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
@asfgit asfgit closed this in d7d9fa0 Nov 16, 2015
@zero323 zero323 deleted the SPARK-11086 branch April 6, 2017 11:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants