Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ case class ListQuery(
plan.canonicalized,
outerAttrs.map(_.canonicalized),
ExprId(0),
childOutputs.map(_.canonicalized.asInstanceOf[Attribute]),
plan.canonicalized.output,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks correct, but I don't know how is this related to the cache problem. Can you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The premise of using cache is that canonicalized of two plans is equals.
canonicalized of plan of two ListQuery is equals, but canonicalized of childOutputs is different because their exprIds are different.
In the end, the cache did not take effect.

plan to be executed:
image

Plan that have been cached:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allisonwang-db do you know why we have childOutputs? It seems to be child.output most of the time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or should we delete childOutputs?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm experiencing the similar issue after upgrading Spark version from 3.2 to 3.3 when updating AWS EMR version from 6.7.0 to 6.9.0. May I ask which Spark version this issue starts with? And I'm wondering if Spark has any plan on pushing this fix?

joinCond.map(_.canonicalized))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, ExternalRDDScanExec, RDDScanExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -1671,4 +1671,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
}
}
}

test("SPARK-41191: Cache Table is not working while nested caches exist") {
withCache("t1", "t2") {
sql("CACHE TABLE t1 as SELECT a FROM testData3 GROUP BY a")
sql("CACHE TABLE t2 as SELECT a,b FROM testData2 WHERE a IN " +
"(SELECT a FROM t1)")
val sparkPlan = sql("SELECT key,value,b FROM testData t3 JOIN t2 ON t3.key=t2.a")
.queryExecution.sparkPlan
assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 1)
assert(sparkPlan.collect { case e: ExternalRDDScanExec[_] => e }.size === 1)
}
}
}