Skip to content

Commit 665a428

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-53905][SQL] Refactor RelationResolution to enable code reuse
### What changes were proposed in this pull request? This PR refactor RelationResolution to enable code reuse. ### Why are the changes needed? These changes are needed to simplify subsequent PRs. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests in `PlanResolutionSuite` already cover plan ID cloning and cache hits. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52781 from aokolnychyi/spark-53905. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 571b802 commit 665a428

File tree

1 file changed

+31
-23
lines changed

1 file changed

+31
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import scala.collection.mutable
21+
2022
import org.apache.spark.internal.Logging
2123
import org.apache.spark.sql.AnalysisException
2224
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -49,8 +51,13 @@ class RelationResolution(override val catalogManager: CatalogManager)
4951
with Logging
5052
with LookupCatalog
5153
with SQLConfHelper {
54+
55+
type CacheKey = (Seq[String], Option[TimeTravelSpec])
56+
5257
val v1SessionCatalog = catalogManager.v1SessionCatalog
5358

59+
private def relationCache: mutable.Map[CacheKey, LogicalPlan] = AnalysisContext.get.relationCache
60+
5461
/**
5562
* If we are resolving database objects (relations, functions, etc.) inside views, we may need to
5663
* expand single or multi-part identifiers with the current catalog and namespace of when the
@@ -109,12 +116,9 @@ class RelationResolution(override val catalogManager: CatalogManager)
109116
}.orElse {
110117
expandIdentifier(u.multipartIdentifier) match {
111118
case CatalogAndIdentifier(catalog, ident) =>
112-
val key =
113-
(
114-
(catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq,
115-
finalTimeTravelSpec
116-
)
117-
AnalysisContext.get.relationCache
119+
val key = toCacheKey(catalog, ident, finalTimeTravelSpec)
120+
val planId = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
121+
relationCache
118122
.get(key)
119123
.map { cache =>
120124
val cachedRelation = cache.transform {
@@ -123,13 +127,7 @@ class RelationResolution(override val catalogManager: CatalogManager)
123127
newRelation.copyTagsFrom(multi)
124128
newRelation
125129
}
126-
u.getTagValue(LogicalPlan.PLAN_ID_TAG)
127-
.map { planId =>
128-
val cachedConnectRelation = cachedRelation.clone()
129-
cachedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
130-
cachedConnectRelation
131-
}
132-
.getOrElse(cachedRelation)
130+
cloneWithPlanId(cachedRelation, planId)
133131
}
134132
.orElse {
135133
val writePrivilegesString =
@@ -144,16 +142,8 @@ class RelationResolution(override val catalogManager: CatalogManager)
144142
u.isStreaming,
145143
finalTimeTravelSpec
146144
)
147-
loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
148-
u.getTagValue(LogicalPlan.PLAN_ID_TAG)
149-
.map { planId =>
150-
loaded.map { loadedRelation =>
151-
val loadedConnectRelation = loadedRelation.clone()
152-
loadedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, planId)
153-
loadedConnectRelation
154-
}
155-
}
156-
.getOrElse(loaded)
145+
loaded.foreach(relationCache.update(key, _))
146+
loaded.map(cloneWithPlanId(_, planId))
157147
}
158148
case _ => None
159149
}
@@ -263,4 +253,22 @@ class RelationResolution(override val catalogManager: CatalogManager)
263253
}
264254
} else None
265255
}
256+
257+
private def toCacheKey(
258+
catalog: CatalogPlugin,
259+
ident: Identifier,
260+
timeTravelSpec: Option[TimeTravelSpec] = None): CacheKey = {
261+
((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq, timeTravelSpec)
262+
}
263+
264+
private def cloneWithPlanId(plan: LogicalPlan, planId: Option[Long]): LogicalPlan = {
265+
planId match {
266+
case Some(id) =>
267+
val clone = plan.clone()
268+
clone.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
269+
clone
270+
case None =>
271+
plan
272+
}
273+
}
266274
}

0 commit comments

Comments
 (0)