Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Sep 14, 2017

What changes were proposed in this pull request?

SPARK-21690 makes one-pass Imputer by parallelizing the computation of all input columns. When we transform dataset with ImputerModel, we do withColumn on all input columns sequentially. We can also do this on all input columns at once by adding a withColumns API to Dataset.

The new withColumns API is for internal use only now.

How was this patch tested?

Existing tests for ImputerModel's change. Added tests for withColumns API.

@viirya
Copy link
Member Author

viirya commented Sep 14, 2017

Ran the similar benchmark as #18902 (comment):

numColums Old Mean Old Median New Mean New Median
1 0.3597440068 0.12441702019999998 0.2401416984 0.1535934578
10 0.5698301512999999 0.36436769210000003 0.2588411808 0.18644379360000002
100 6.6054379131 6.779679862500001 0.45659362859999997 0.4994426849

The test code is the same basically but measuring transforming time now:

import org.apache.spark.ml.feature._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
import scala.util.Random

val seed = 123l
val random = new Random(seed)
val n = 10000
val m = 100
val rows = sc.parallelize(1 to n).map(i=> Row(Array.fill(m)(random.nextDouble): _*))
val struct = new StructType(Array.range(0,m,1).map(i => StructField(s"c$i",DoubleType,true)))
val df = spark.createDataFrame(rows, struct)
df.persist()
df.count()

for (strategy <- Seq("mean", "median"); k <- Seq(1,10,100)) {
  val imputer = new Imputer().setStrategy(strategy).setInputCols(Array.range(0,k,1).map(i=>s"c$i")).setOutputCols(Array.range(0,k,1).map(i=>s"o$i"))
  var duration = 0.0
  for (i<- 0 until 10) {
    val model = imputer.fit(df)
    val start = System.nanoTime()
    model.transform(df).count
    val end = System.nanoTime()
    duration += (end - start) / 1e9
  }
  println((strategy, k, duration/10))
}

@viirya
Copy link
Member Author

viirya commented Sep 14, 2017

cc @MLnick @zhengruifeng @yanboliang

@viirya
Copy link
Member Author

viirya commented Sep 14, 2017

FYI, the withColumns API was proposed in #17819.

@zhengruifeng
Copy link
Contributor

In the test code, should we use model.transform(df).count instead?

@viirya
Copy link
Member Author

viirya commented Sep 14, 2017

@zhengruifeng Yeah, it is better. Actually I think the difference between running multiple withColumn and one withColumns is mainly in the cost of query analysis and plan/dataset initialization. I will re-run the benchmark.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81755 has finished for PR 19229 at commit 4efb643.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81756 has finished for PR 19229 at commit 4b47709.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya In your benchmark, the case numCols == 100,
the performance increase 15x, I doubt there is some mistake in benchmark. It is very possible that the testcode re-use shuffle result (when recomputation in each loop) which cause the code path skip the stage of scanning input data.

So, I hope you can re-generate testing data in every test loop. and then do model.fit. Can you update testcode and retest again?

Because the performance increasement is almost impossible in my opinion. Later I will test this by myself, if I have time. Thanks.

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

@WeichenXu123 Sure. And I must point out that I ran this benchmark in spark-shell under local mode. It is great if you can run the benchmark too to verify the numbers.

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

@WeichenXu123 Btw, the test is basically re-using the codes from #18902 (comment). Is your concern is specified for this?

@WeichenXu123
Copy link
Contributor

@viirya I guess the reason is, the old PR version: df.withColumn(..).withColumn(..).withColumn(..)...., the long df chain prevent the shuffle re-using... but now you merge them into one step.

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

Updated test codes:

import org.apache.spark.ml.feature._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import spark.implicits._
import scala.util.Random

def genData(): DataFrame = {
  val seed = 123l
  val random = new Random(seed)
  val n = 10000
  val m = 100
  val rows = sc.parallelize(1 to n).map(i=> Row(Array.fill(m)(random.nextDouble): _*))
  val struct = new StructType(Array.range(0,m,1).map(i => StructField(s"c$i",DoubleType,true)))
  spark.createDataFrame(rows, struct)
}

for (strategy <- Seq("mean", "median"); k <- Seq(1,10,100)) {
  val imputer = new Imputer().setStrategy(strategy).setInputCols(Array.range(0,k,1).map(i=>s"c$i")).setOutputCols(Array.range(0,k,1).map(i=>s"o$i"))
  var duration = 0.0
  for (i<- 0 until 10) {
    val df = genData()
    val model = imputer.fit(df)
    val start = System.nanoTime()
    val df2 = genData()
    model.transform(df2).count
    val end = System.nanoTime()
    duration += (end - start) / 1e9
  }
  println((strategy, k, duration/10))
}

@WeichenXu123
Copy link
Contributor

Great! That's it. thanks!

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

New numbers:

numColums Old Mean Old Median New Mean New Median
1 0.17278329159999997 0.1537169693 0.16873250489999997 0.1521283075
10 0.3250422628 0.3086496881 0.16972776130000003 0.16117073769999998
100 6.3575860038 7.155657411799998 0.3299611978 0.3731574154

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

I don't think re-using shuffle is the reason behind the numbers. If you looked at the previous comments, you will find that I ran the test before without count after model.transform. Namely the dataframe operations haven't been triggered.

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

Btw, I don't see any reason about df.withColumn(..).withColumn(..).withColumn(..) can prevent the shuffle re-using.

@WeichenXu123
Copy link
Contributor

Looks not the reason. maybe issues somewhere else. Let me run test later. Thanks!
But there is some small issues in test:
Don't include gen data time:

    val start = System.nanoTime()
    val df2 = genData()
    model.transform(df2).count
    val end = System.nanoTime()

and add cache at the end of genData:

def genData() = {
   ....
   val df = spark.createDataframe...
  df.cache()
  df.count() // force trigger cache
  df
}

and we'd better add warm up code before record code running time.

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 18, 2017

@viirya I run the code, you're right, most of time cost on the executedPlan generation (The old version code). thanks!
But can you append benchmark comparison with RDD.aggregate version? 8daffc9

@viirya
Copy link
Member Author

viirya commented Sep 18, 2017

@WeichenXu123 Thanks for verifying that.

Do you mean using ApproxQuantiles to compute mean and median? But I think this change is not intended to improve this part, but the model.transform. And I think we don't use RDD.aggregate to do the model.transform, right?

@WeichenXu123
Copy link
Contributor

@viirya No, keep the dataframe version code. But I only want to confirm how much performance gap between this and RDD version. (for possible improvements in the future, because in similar test I found dataframe is still slower than RDD version)

@viirya
Copy link
Member Author

viirya commented Sep 19, 2017

@WeichenXu123 I'm not sure I understand it correctly. This change only replaces the chain of withColumn to a pass of withColumns. We don't have RDD version for this, so I'm not sure what version you want to compare?

@WeichenXu123
Copy link
Contributor

Oh. That's what have done in the old PR #18902 .(Because the RDD version (not in master branch, only personal impl here (sorry for put wrong link, the code link is here: 8daffc9) will be faster than dataframe version based on current spark. Now your PR has some improvement on the perf, I would like to compare them again. We hope to track this performance gap and try to resolve it in the future. According to my other similar case, now the dataframe version will be about 2-3x slower than RDD version in the case numCols==100 for now. But if you have no time, I can help do it. Thanks!

@viirya
Copy link
Member Author

viirya commented Sep 20, 2017

I ran the test codes to benchmark between RDD-version and DataFrame version with this ImputerModel change:

import org.apache.spark.ml.feature._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import spark.implicits._
import scala.util.Random

def genData(): DataFrame = {
  val seed = 123l
  val random = new Random(seed)
  val n = 10000
  val m = 100
  val rows = sc.parallelize(1 to n).map(i=> Row(Array.fill(m)(random.nextDouble): _*))
  val struct = new StructType(Array.range(0,m,1).map(i => StructField(s"c$i",DoubleType,true)))
  val df = spark.createDataFrame(rows, struct)
  df.cache()
  df.count()
  df
}

for (strategy <- Seq("mean", "median"); k <- Seq(1,10,100)) {
  val imputer = new Imputer().setStrategy(strategy).setInputCols(Array.range(0,k,1).map(i=>s"c$i")).setOutputCols(Array.range(0,k,1).map(i=>s"o$i"))
  var duration = 0.0
  for (i<- 0 until 10) {
    val df = genData()

    val start = System.nanoTime()
    val model = imputer.fit(df)
    val end = System.nanoTime()

    val df2 = genData()

    val start2 = System.nanoTime()
    model.transform(df2).count
    val end2 = System.nanoTime()

    duration += ((end - start) + (end2 - start2)) / 1e9
  }
  println((strategy, k, duration/10))
}

@viirya
Copy link
Member Author

viirya commented Sep 20, 2017

numColums RDD Mean RDD Median DataFrame Mean DataFrame Median
1 0.1642173481 0.199774305 0.42601806710000006 0.2025112919
10 0.3713707549 0.5290104043000001 0.43626068409999996 0.49521778340000006
100 6.8645389335 8.838674982899999 1.6645560224 2.9213964243999997

@WeichenXu123
Copy link
Contributor

@viirya Thanks very much! Although the perf gap exists (when numCols is large), it won't block this PR. I will create a JIRA to track this.

@SparkQA
Copy link

SparkQA commented Sep 20, 2017

Test build #81964 has finished for PR 19229 at commit 2086900.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 22, 2017

ping @zhengruifeng @WeichenXu123 Any more comments on this? Thanks.

/**
* Returns a new Dataset by adding columns or replacing the existing columns that has
* the same names.
*/
Copy link
Member Author

@viirya viirya Sep 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan should have looked at this withColumns before in #17819. cc @cloud-fan to see if you has more comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @cloud-fan or @gatorsmile Can you check the SQL part? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withColumn can be reimplemented by calling this func?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be. But even we want to do it, I'd prefer in a follow-up instead of this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should do it to avoid duplicate codes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Then I will do it in this PR.

@zhengruifeng
Copy link
Contributor

I am not familiar with SQL source, but I think it's great to transform all columns at a time

@WeichenXu123
Copy link
Contributor

WeichenXu123 commented Sep 22, 2017

The performance gap issue (compared with RDD version), I create a separated JIRA to track:
https://issues.apache.org/jira/browse/SPARK-22105
As the result of offline discussion with @cloud-fan , the reason should be codegen size too large causing JVM failed to JIT. This PR should fix this #19082

@viirya
Copy link
Member Author

viirya commented Sep 23, 2017

Yeah, I think that fix should work for the strategy Imputer.mean because Imputer.mean aggregates many columns at once now and that can be a too large gen'd code for aggregation.

For the strategy Imputer.median, because it uses approxQuantile which calls rdd's aggregate API, I think codegen doesn't affect this part.

@WeichenXu123
Copy link
Contributor

@viirya Yeah the perf gap I only focus on mean which can take advantage of codegen.

@viirya
Copy link
Member Author

viirya commented Sep 24, 2017

@WeichenXu123 Have any more comments on this? Thanks. I think the ML part is straightforward.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ML part looks good to me, except a minor style issue. Thanks!

val ic = col(inputCol)
outputDF = outputDF.withColumn(outputCol,
when(ic.isNull, surrogate)
when(ic.isNull, surrogate)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: indent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This when is not a call of previous line. I think it doesn't need to indent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I misread. The style is ok.

private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = {
require(colNames.size == cols.size,
s"The size of column names: ${colNames.size} isn't equal to " +
s"the size of columns: ${cols.size}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need to consider the case sensitivity issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

@SparkQA
Copy link

SparkQA commented Sep 25, 2017

Test build #82141 has finished for PR 19229 at commit 07dec0f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 25, 2017

@gatorsmile Added the check for case sensitivity. Please take a look again. Thanks.

@viirya
Copy link
Member Author

viirya commented Sep 27, 2017

ping @gatorsmile for the SQL part.

@viirya
Copy link
Member Author

viirya commented Sep 29, 2017

ping @gatorsmile Can you take a quick look? Thanks.

@viirya
Copy link
Member Author

viirya commented Sep 29, 2017

also cc @jkbradley and @MLnick for final check of the ML change. Thanks.

@SparkQA
Copy link

SparkQA commented Sep 30, 2017

Test build #82344 has finished for PR 19229 at commit 21048a8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 30, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Sep 30, 2017

Test build #82347 has finished for PR 19229 at commit 21048a8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Sep 30, 2017

@gatorsmile withColumn is reimplemented now. Please take a look when you have time. Thanks.

col.as(colName)
} else {
Column(field)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      columnMap.find { case (colName, _) =>
        resolver(field.name, colName)
      } match {
        case Some((colName: String, col: Column)) => col.as(colName)
        case _ => Column(field)
      }

/**
* Returns a new Dataset by adding columns with metadata.
*/
private[spark] def withColumns(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not being used and tested. Could we remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We can add this when we need it.

@SparkQA
Copy link

SparkQA commented Oct 1, 2017

Test build #82369 has finished for PR 19229 at commit 1292ce0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@asfgit asfgit closed this in 3ca3670 Oct 1, 2017
@gatorsmile
Copy link
Member

Thanks! Merged to master.

@viirya
Copy link
Member Author

viirya commented Oct 2, 2017

Thanks @gatorsmile @WeichenXu123 @zhengruifeng

@viirya viirya deleted the SPARK-22001 branch December 27, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants