File tree Expand file tree Collapse file tree 1 file changed +13
-2
lines changed
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis Expand file tree Collapse file tree 1 file changed +13
-2
lines changed Original file line number Diff line number Diff line change @@ -3262,10 +3262,21 @@ object CleanupAliases extends Rule[LogicalPlan] {
32623262
32633263 def trimNonTopLevelAliases (e : Expression ): Expression = e match {
32643264 case a : Alias =>
3265- a.copy(child = trimAliases(a.child))(
3265+ val newChild = trimAliases(a.child)
3266+ // Specific logic for keeping the eventTime watermark metadata in the top level alias.
3267+ val metadata = if (newChild != a.child) {
3268+ newChild match {
3269+ case attr : AttributeReference if attr.metadata.contains(EventTimeWatermark .delayKey) =>
3270+ attr.metadata
3271+ case _ => a.metadata
3272+ }
3273+ } else {
3274+ a.metadata
3275+ }
3276+ a.copy(child = newChild)(
32663277 exprId = a.exprId,
32673278 qualifier = a.qualifier,
3268- explicitMetadata = Some (a. metadata))
3279+ explicitMetadata = Some (metadata))
32693280 case a : MultiAlias =>
32703281 a.copy(child = trimAliases(a.child))
32713282 case other => trimAliases(other)
You can’t perform that action at this time.
0 commit comments