Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix merge conflict with branch-21.10 #3748

Merged
merged 2 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,10 @@ class Spark320Shims extends Spark32XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
Expand Down Expand Up @@ -788,11 +789,12 @@ class Spark320Shims extends Spark32XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val shuffleOrigin = cpuShuffle.map(_.shuffleOrigin).getOrElse(ENSURE_REQUIREMENTS)
GpuShuffleExchangeExec(outputPartitioning, child, shuffleOrigin)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, shuffleOrigin)(cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*/
case class GpuWindowInPandasExec(
windowExpression: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {
gpuPartitionSpec: Seq[Expression],
cpuOrderSpec: Seq[SortOrder],
child: SparkPlan)(
override val cpuPartitionSpec: Seq[Expression]) extends GpuWindowInPandasExecBase {

override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: Nil

override final def pythonModuleKey: String = "spark"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -23,12 +25,17 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
canChangeNumPartitions: Boolean)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
canChangeNumPartitions: Boolean)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
*/
case class GpuWindowInPandasExec(
projectList: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan) extends GpuWindowInPandasExecBase {
gpuPartitionSpec: Seq[Expression],
cpuOrderSpec: Seq[SortOrder],
child: SparkPlan)(
override val cpuPartitionSpec: Seq[Expression]) extends GpuWindowInPandasExecBase {

override def otherCopyArgs: Seq[AnyRef] = cpuPartitionSpec :: Nil

override final def pythonModuleKey: String = "databricks"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch, TrampolineUtil}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
import org.apache.spark.sql.rapids.shims.v2.{GpuFileScanRDD, GpuSchemaUtils}
Expand Down Expand Up @@ -116,11 +116,13 @@ abstract class SparkBaseShims extends Spark30XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val canChangeNumPartitions = cpuShuffle.forall(_.canChangeNumPartitions)
GpuShuffleExchangeExec(outputPartitioning, child, canChangeNumPartitions)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, canChangeNumPartitions)(
cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down Expand Up @@ -223,9 +225,10 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[SortMergeJoinExec](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -23,12 +25,17 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
canChangeNumPartitions: Boolean)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
canChangeNumPartitions: Boolean)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowEvalPy
import org.apache.spark.sql.execution.window.WindowExecBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuAbs, GpuAverage, GpuFileSourceScanExec, GpuStringReplace, GpuTimeSub}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase, JoinTypeChecks, SerializeBatchDeserializeHostBuffer, SerializeConcatHostBuffersDeserializeBatch}
import org.apache.spark.sql.rapids.execution.python._
import org.apache.spark.sql.rapids.execution.python.shims.v2._
import org.apache.spark.sql.rapids.shims.v2.GpuSchemaUtils
Expand Down Expand Up @@ -101,11 +101,13 @@ abstract class SparkBaseShims extends Spark30XShims {
}

override def getGpuShuffleExchangeExec(
outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
cpuOutputPartitioning: Partitioning,
cpuShuffle: Option[ShuffleExchangeExec]): GpuShuffleExchangeExecBase = {
val canChangeNumPartitions = cpuShuffle.forall(_.canChangeNumPartitions)
GpuShuffleExchangeExec(outputPartitioning, child, canChangeNumPartitions)
GpuShuffleExchangeExec(gpuOutputPartitioning, child, canChangeNumPartitions)(
cpuOutputPartitioning)
}

override def getGpuShuffleExchangeExec(
Expand Down Expand Up @@ -146,9 +148,10 @@ abstract class SparkBaseShims extends Spark30XShims {
GpuWindowInPandasExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
// leave ordering expression on the CPU, it's not used for GPU computation
winPy.orderSpec,
childPlans.head.convertIfNeeded()
)
)(winPy.partitionSpec)
}
}).disabledByDefault("it only supports row based frame for now"),
GpuOverrides.exec[FileSourceScanExec](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,32 @@ class GpuShuffledHashJoinMeta(
None,
left,
right,
isSkewJoin = false)
isSkewJoin = false)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
}
}

case class GpuShuffledHashJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
override val leftKeys: Seq[Expression],
override val rightKeys: Seq[Expression],
joinType: JoinType,
buildSide: GpuBuildSide,
override val condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
override val isSkewJoin: Boolean)
override val isSkewJoin: Boolean)(
cpuLeftKeys: Seq[Expression],
cpuRightKeys: Seq[Expression])
extends GpuShuffledHashJoinBase(
leftKeys,
rightKeys,
buildSide,
condition,
isSkewJoin = isSkewJoin)
isSkewJoin = isSkewJoin,
cpuLeftKeys,
cpuRightKeys) {

override def otherCopyArgs: Seq[AnyRef] = cpuLeftKeys :: cpuRightKeys :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ class GpuSortMergeJoinMeta(
None,
left,
right,
join.isSkewJoin)
join.isSkewJoin)(
join.leftKeys,
join.rightKeys)
// The GPU does not yet support conditional joins, so conditions are implemented
// as a filter after the join when possible.
condition.map(c => GpuFilterExec(c.convertToGpu(), joinExec)).getOrElse(joinExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -24,12 +26,17 @@ import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrig
import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics

case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning,
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
shuffleOrigin: ShuffleOrigin)
extends GpuShuffleExchangeExecBaseWithMetrics(outputPartitioning, child)
shuffleOrigin: ShuffleOrigin)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Loading