Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Aug 10, 2017

What changes were proposed in this pull request?

parallelize the computation of all columns

performance tests:

numColums Mean(Old) Median(Old) Mean(RDD) Median(RDD) Mean(DF) Median(DF)
1 0.0771394713 0.0658712813 0.080779802 0.048165981499999996 0.10525509870000001 0.0499620203
10 0.7234340630999999 0.5954440414 0.0867935197 0.13263428659999998 0.09255724889999999 0.1573943635
100 7.3756451568 6.2196631259 0.1911931552 0.8625376817000001 0.5557462431 1.7216837982000002

How was this patch tested?

existing tests

@SparkQA
Copy link

SparkQA commented Aug 10, 2017

Test build #80477 has finished for PR 18902 at commit f6f166f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 10, 2017

Test build #80478 has finished for PR 18902 at commit 660c2db.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Jenkis, retest this please

@SparkQA
Copy link

SparkQA commented Aug 10, 2017

Test build #80479 has finished for PR 18902 at commit 660c2db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hhbyyh
Copy link
Contributor

hhbyyh commented Aug 10, 2017

Hi @zhengruifeng Thanks for the idea and implementation. Definitely something worth exploring.

As I understand, the new implementation improves the locality yet it leverages RDD API instead of Dataset API. Since overall this targets a performance improvement, I'd be interested to see the performance comparison.

@zhengruifeng
Copy link
Contributor Author

@hhbyyh Yes, I will test the performance.
Btw, the median computation by call stat.approxQuantile will also transform df into rdd before aggregation. see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala#L102

@zhengruifeng
Copy link
Contributor Author

I test the performance on a small data, the value in the following table is the average duration in seconds:

numColums Old Mean Old Median New Mean New Median
1 0.0771394713 0.0658712813 0.080779802 0.048165981499999996
10 0.7234340630999999 0.5954440414 0.0867935197 0.13263428659999998
100 7.3756451568 6.2196631259 0.1911931552 0.8625376817000001

We can see that, even on a small data, the speedup is significant.
On big dataset that do not fit in memory, we should obtain better speedup.

and the test code is here:

import org.apache.spark.ml.feature._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
import scala.util.Random

val seed = 123l
val random = new Random(seed)
val n = 10000
val m = 100
val rows = sc.parallelize(1 to n).map(i=> Row(Array.fill(m)(random.nextDouble): _*))
val struct = new StructType(Array.range(0,m,1).map(i => StructField(s"c$i",DoubleType,true)))
val df = spark.createDataFrame(rows, struct)
df.persist()
df.count()

for (strategy <- Seq("mean", "median"); k <- Seq(1,10,100)) {
val imputer = new Imputer().setStrategy(strategy).setInputCols(Array.range(0,k,1).map(i=>s"c$i")).setOutputCols(Array.range(0,k,1).map(i=>s"o$i"))
var duration = 0.0
for (i<- 0 until 10) {
val start = System.nanoTime()
imputer.fit(df)
val end = System.nanoTime()
duration += (end - start) / 1e9
}
println((strategy, k, duration/10))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style - use dot notation here not infix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: space between if and (

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80653 has finished for PR 18902 at commit 8283411.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hhbyyh
Copy link
Contributor

hhbyyh commented Aug 15, 2017

Eh, I meant that it may be possible to get the mean values purely using DataFrame API. (convert missingValue/NaN to null) in one pass, so we may need to check the performance comparison. But I guess it looks a little hack.

For the median value, it may be harder so we can use the RDD API. (to be confirmed).

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Aug 15, 2017

@hhbyyh Good Idea! We can also use this trick to compute median, because method multipleApproxQuantiles

can handle both null and NaN

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80660 has finished for PR 18902 at commit fd1eb43.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80663 has finished for PR 18902 at commit fd1eb43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80666 has finished for PR 18902 at commit 2cca623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80667 has finished for PR 18902 at commit df7a0a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2017

Test build #80675 has finished for PR 18902 at commit 5921f51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Aug 16, 2017

@hhbyyh I rewrite the impl, and now all NaN and missingValue will be converted to null at first, then current methods are used.
For columns only containing null, null is returned for avg(col), and Array.empty[Double] is returned for median
I think it is more concise than the old impl using aggregation on rdd, and we do not need to worry about the perfermance.

@hhbyyh
Copy link
Contributor

hhbyyh commented Aug 16, 2017

Thanks for the quick update. The implementation may be improved on some details. But first I'd want to confirm the "convert to null" method does not have any defect.
@MLnick @srowen @yanboliang

And we may need more unit tests to constantly monitor the SQL behavior (avg and stat) on null.

@zhengruifeng
Copy link
Contributor Author

I test on dataframes containing null, both avg and stat.approxQuantile will ignore null. And if one column only contain null, null and Array.empty[Double] will be returned respectively.
Agree that we add more tests for this dependency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add API annotation to clarify that the relative error of median is 0.001 if strategy == median.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, add annotation here to clarify approxQuantile function will ignore null automatically.

@yanboliang
Copy link
Contributor

yanboliang commented Aug 17, 2017

@hhbyyh @zhengruifeng I'm ok with the convert to null method, I think there is no extra pass for data if we handle it in this way, and the DataFrame/RDD functions to compute mean/median will ignore null . Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add annotation here to clarify avg function will ignore null automatically.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, add annotation here to clarify approxQuantile function will ignore null automatically.

@MLnick
Copy link
Contributor

MLnick commented Aug 17, 2017

@zhengruifeng Could you verify & compare the performance of this new DF-based approach vs your original RDD-based one?

@SparkQA
Copy link

SparkQA commented Aug 17, 2017

Test build #80780 has finished for PR 18902 at commit 495d701.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Aug 17, 2017

@MLnick @yanboliang @hhbyyh I update the performance comparison.
The DF-based impl is a little slower than the RDD-based one when num of column is small.
When num of column is large (100), DF-based impl is about 2~3 X slower than RDD-based one.

@yanboliang
Copy link
Contributor

yanboliang commented Aug 17, 2017

@zhengruifeng What the RDD-based one means? It's the code on master or the code in your former commit?

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Aug 18, 2017

@yanboliang RDD-based impl the former commit

@yanboliang
Copy link
Contributor

yanboliang commented Aug 25, 2017

@zhengruifeng DataFrame-based operation is 2-3x slower than RDD-based operation is a known issue, because of the deserialization cost. If we switch to RDD-based method, we need to implement our own aggregator to calculate mean and median, this need much more code than calling DataFrame API. BTW, DF using more compact structure that can reduce memory footprint.
From my perspective, I'd suggest to keep the current DF-based solution. As it will 5-10 faster than the original implementation. @hhbyyh @MLnick What do you think about it? Thanks.

@zhengruifeng
Copy link
Contributor Author

@yanboliang Although dispointed by DF's performance, I also approve the choice of DF just for less code.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 4, 2017

+1 for using Dataframe-based version code.

@zhengruifeng One thing I want to confirm is that, I check your testing code, both RDD-based version and Dataframe-based version code will both cost on deserialization:

...
val df = spark.createDataFrame(rows, struct)
df.persist()
df.count()
...
// do `imputer.fit`

when running imputer.fit, it will extract the required columns from the cached input dataframe, and then you compare the perf between RDD.aggregate and dataframe avg, they both need to deserialize data from the input dataframe and then do computation, and dataframe avg will take advantage of codegen and should be faster. But here the test show that RDD version is faster than Dataframe version, it is not very reasonable, so I want to confirm:

in your RDD version testing, do you cache `RDD` again when get `RDD` from the input `Dataframe`?

If not, your testing has no problem, I will guess there exists other performance issue in SQL layer and cc @cloud-fan to take a look.

@zhengruifeng You can paste both versions code of RDD and dataframe with there own testing code so that I can check the perf issue deeply.

@zhengruifeng
Copy link
Contributor Author

@WeichenXu123 No, I only cache the DataFrame. And the RDD-Version is here.
I use the same testsuit above to test those impls.

@WeichenXu123
Copy link
Contributor

hmm... that's interesting. So I found performance gap between dataframe codegen aggregation and the simple RDD aggregation. I will discuss with SQL team for this later. Thanks!

@MLnick
Copy link
Contributor

MLnick commented Sep 4, 2017

Seems fine to me to use the DF version even though it's slower. But we should open a JIRA issue to track where the gap is on the SQL side of things and try to improve the performance.

@WeichenXu123
Copy link
Contributor

Sure. I will create JIRA after this perf gap is confirmed.

@zhengruifeng
Copy link
Contributor Author

Any more comments on this PR? It have been about one month since the last modification.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Performance issue can be put into separated jira. Thanks! ping @yanboliang to make a final review.

Copy link
Contributor

@yanboliang yanboliang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current DF-based solution is 5-10 faster than the original implementation and can reduce memory footprint, so this look good to me.
Regards to the performance gap between DF and RDD, I found the gap is larger if there are more columns, so let's dig into the DF code generation for multiple columns, to check whether it makes some difference. Let's track the SQL issue in a separate JIRA.

@yanboliang
Copy link
Contributor

Merged into master. Thanks for all.

@asfgit asfgit closed this in 0fa5b7c Sep 13, 2017
@MLnick
Copy link
Contributor

MLnick commented Sep 13, 2017

cc @viirya on the multi-column generation issue - could be similar general solution to #17819?

@zhengruifeng zhengruifeng deleted the parallelize_imputer branch September 14, 2017 01:32
@viirya
Copy link
Member

viirya commented Sep 14, 2017

@MLnick Thanks for pinging me.

I go through this quickly. The basic idea is the same, performing the operations on multiple inputs columns at one single Dataset/DataFrame operation.

Unlike Bucketizer, Imputer has no compatibility concern because it already supports multiple input columns (HasInputCols). In Bucketizer, we don't want to break its current API so it makes thing more complicated a bit.

Actually I'm noticed by ImputerModel which also applies withColumn sequentially on each input column. I'd like to address this part with the withColumns API proposed in #17819. What do you think @MLnick?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants