From e1f8e5f9a2fd9632aab51a517cedd3e558b934f8 Mon Sep 17 00:00:00 2001 From: yaohua Date: Wed, 8 Jun 2022 17:41:13 +0900 Subject: [PATCH] [SPARK-39404][SS] Minor fix for querying `_metadata` in streaming ### What changes were proposed in this pull request? We added the support to query the `_metadata` column with a file-based streaming source: https://github.com/apache/spark/pull/35676. We propose to use `transformUp` instead of `match` when pattern matching the `dataPlan` in `MicroBatchExecution` `runBatch` method in this PR. It is fine for `FileStreamSource` because `FileStreamSource` always returns one `LogicalRelation` node (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L247). But the proposed change will make the logic robust and we really should not rely on the upstream source to return a desired plan. In addition, the proposed change could also make `_metadata` work if someone wants to customize `FileStreamSource` `getBatch`. ### Why are the changes needed? Robust ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #36801 from Yaohua628/spark-39404. Authored-by: yaohua Signed-off-by: Jungtaek Lim --- .../spark/sql/execution/streaming/MicroBatchExecution.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index d8806f03443f..0af45d9472a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -583,9 +583,8 @@ class MicroBatchExecution( case FileSourceMetadataAttribute(_) => true case _ => false } - val finalDataPlan = dataPlan match { + val finalDataPlan = dataPlan transformUp { case l: LogicalRelation if hasFileMetadata => l.withMetadataColumns() - case _ => dataPlan } // SPARK-40460: overwrite the entry with the new logicalPlan // because it might contain the _metadata column. It is a necessary change,