Skip to content

Commit

Permalink
feat: Add ShuffleQueryStageExec to direct child node for CometBroadca…
Browse files Browse the repository at this point in the history
…stExchangeExec
  • Loading branch information
viirya committed Aug 28, 2024
1 parent 5f41063 commit 052f9ad
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,16 @@ case class CometBroadcastExchangeExec(
case AQEShuffleReadExec(s: ShuffleQueryStageExec, _)
if s.plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(s.plan.asInstanceOf[CometPlan]).collect()
case s: ShuffleQueryStageExec if s.plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(s.plan.asInstanceOf[CometPlan]).collect()
case ReusedExchangeExec(_, plan) if plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
case AQEShuffleReadExec(ShuffleQueryStageExec(_, ReusedExchangeExec(_, plan), _), _)
if plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
case ShuffleQueryStageExec(_, ReusedExchangeExec(_, plan), _)
if plan.isInstanceOf[CometPlan] =>
CometExec.getByteArrayRdd(plan.asInstanceOf[CometPlan]).collect()
case AQEShuffleReadExec(s: ShuffleQueryStageExec, _) =>
throw new CometRuntimeException(
"Child of CometBroadcastExchangeExec should be CometExec, " +
Expand Down
16 changes: 16 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,22 @@ class CometExecSuite extends CometTestBase {
}
}

test("ShuffleQueryStageExec could be direct child node of CometBroadcastExchangeExec") {
val table = "src"
withTable(table) {
withView("lv_noalias") {
sql(s"CREATE TABLE $table (key INT, value STRING) USING PARQUET")
sql(s"insert into $table values(238, 'val_238')")

sql(
"create view lv_noalias as SELECT myTab.* from src " +
"LATERAL VIEW explode(map('key1', 100, 'key2', 200)) myTab limit 2")
val df = sql("select * from lv_noalias a join lv_noalias b on a.key=b.key");
checkSparkAnswer(df)
}
}
}

test("Sort on single struct should fallback to Spark") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down

0 comments on commit 052f9ad

Please sign in to comment.