Skip to content

Conversation

@zzzzming95
Copy link
Contributor

@zzzzming95 zzzzming95 commented Sep 4, 2023

What changes were proposed in this pull request?

Since BinaryArithmetic#dataType will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:

import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: SPARK-39316

Why are the changes needed?

Optimize the processing speed of BinaryArithmetic#dataType when processing multi-column data

Does this PR introduce any user-facing change?

No

How was this patch tested?

manual testing

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Sep 4, 2023
@zzzzming95
Copy link
Contributor Author

@ulysses-you cc

@wangyum
Copy link
Member

wangyum commented Sep 5, 2023

cc @cloud-fan

@zzzzming95
Copy link
Contributor Author

@cloud-fan @wangyum

Please merge it to master , thanks

@wangyum wangyum closed this in 16e813c Sep 6, 2023
wangyum pushed a commit that referenced this pull request Sep 6, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
wangyum pushed a commit that referenced this pull request Sep 6, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
@wangyum
Copy link
Member

wangyum commented Sep 6, 2023

Merged to master, branch-3.5 and branch-3.4.

viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit 16e813c)
Signed-off-by: Yuming Wang <yumwang@ebay.com>
(cherry picked from commit a96804b)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
zml1206 pushed a commit to zml1206/spark that referenced this pull request May 7, 2025
…#dataType` when processing multi-column data

### What changes were proposed in this pull request?

Since `BinaryArithmetic#dataType` will recursively process the datatype of each node, the driver will be very slow when multiple columns are processed.

For example, the following code:
```scala
import spark.implicits._
import scala.util.Random
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}

val N = 30
val M = 100

val columns = Seq.fill(N)(Random.alphanumeric.take(8).mkString)
val data = Seq.fill(M)(Seq.fill(N)(Random.nextInt(16) - 5))

val schema = StructType(columns.map(StructField(_, IntegerType)))
val rdd = spark.sparkContext.parallelize(data.map(Row.fromSeq(_)))
val df = spark.createDataFrame(rdd, schema)
val colExprs = columns.map(sum(_))

// gen a new column , and add the other 30 column
df.withColumn("new_col_sum", expr(columns.mkString(" + ")))
```

This code will take a few minutes for the driver to execute in the spark3.4 version, but only takes a few seconds to execute in the spark3.2 version. Related issue: [SPARK-39316](apache#36698)

### Why are the changes needed?

Optimize the processing speed of `BinaryArithmetic#dataType` when processing multi-column data

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

manual testing

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#42804 from zzzzming95/SPARK-45071.

Authored-by: zzzzming95 <505306252@qq.com>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants