diff --git a/src/query/sql/src/executor/physical_plans/physical_union_all.rs b/src/query/sql/src/executor/physical_plans/physical_union_all.rs
index 64ff0aaa7dda0..7c89972d2455b 100644
--- a/src/query/sql/src/executor/physical_plans/physical_union_all.rs
+++ b/src/query/sql/src/executor/physical_plans/physical_union_all.rs
@@ -12,8 +12,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+use std::collections::HashSet;
+
 use databend_common_exception::Result;
 use databend_common_expression::DataField;
+use databend_common_expression::DataSchema;
 use databend_common_expression::DataSchemaRef;
 use databend_common_expression::DataSchemaRefExt;
 use databend_common_expression::RemoteExpr;
@@ -24,6 +27,7 @@ use crate::executor::PhysicalPlanBuilder;
 use crate::optimizer::SExpr;
 use crate::ColumnSet;
 use crate::IndexType;
+use crate::ScalarExpr;
 use crate::TypeCheck;
 
 #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
@@ -52,25 +56,41 @@ impl PhysicalPlanBuilder {
         &mut self,
         s_expr: &SExpr,
         union_all: &crate::plans::UnionAll,
-        required: ColumnSet,
+        mut required: ColumnSet,
         stat_info: PlanStatsInfo,
     ) -> Result<PhysicalPlan> {
         // 1. Prune unused Columns.
-        let left_required = union_all
+        let metadata = self.metadata.read().clone();
+        let lazy_columns = metadata.lazy_columns();
+        required.extend(lazy_columns.clone());
+        let indices: Vec<_> = union_all
             .left_outputs
             .iter()
-            .fold(required.clone(), |mut acc, v| {
-                acc.insert(v.0);
-                acc
-            });
-        let right_required = union_all.right_outputs.iter().fold(required, |mut acc, v| {
-            acc.insert(v.0);
-            acc
-        });
+            .enumerate()
+            .filter_map(|(index, v)| required.contains(&v.0).then_some(index))
+            .collect();
+        let (left_required, right_required) = if indices.is_empty() {
+            (
+                HashSet::from([union_all.left_outputs[0].0]),
+                HashSet::from([union_all.right_outputs[0].0]),
+            )
+        } else {
+            indices.iter().fold(
+                (
+                    HashSet::with_capacity(indices.len()),
+                    HashSet::with_capacity(indices.len()),
+                ),
+                |(mut left, mut right), &index| {
+                    left.insert(union_all.left_outputs[index].0);
+                    right.insert(union_all.right_outputs[index].0);
+                    (left, right)
+                },
+            )
+        };
 
         // 2. Build physical plan.
-        let left_plan = self.build(s_expr.child(0)?, left_required).await?;
-        let right_plan = self.build(s_expr.child(1)?, right_required).await?;
+        let left_plan = self.build(s_expr.child(0)?, left_required.clone()).await?;
+        let right_plan = self.build(s_expr.child(1)?, right_required.clone()).await?;
 
         let left_schema = left_plan.output_schema()?;
         let right_schema = right_plan.output_schema()?;
@@ -78,6 +98,7 @@ impl PhysicalPlanBuilder {
         let fields = union_all
             .left_outputs
             .iter()
+            .filter(|(index, _)| left_required.contains(index))
             .map(|(index, expr)| {
                 if let Some(expr) = expr {
                     Ok(DataField::new(&index.to_string(), expr.data_type()?))
@@ -87,35 +108,9 @@ impl PhysicalPlanBuilder {
             })
             .collect::<Result<Vec<_>>>()?;
 
-        let left_outputs = union_all
-            .left_outputs
-            .iter()
-            .map(|(index, scalar_expr)| {
-                if let Some(scalar_expr) = scalar_expr {
-                    let expr = scalar_expr
-                        .type_check(left_schema.as_ref())?
-                        .project_column_ref(|idx| left_schema.index_of(&idx.to_string()).unwrap());
-                    Ok((*index, Some(expr.as_remote_expr())))
-                } else {
-                    Ok((*index, None))
-                }
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        let right_outputs = union_all
-            .right_outputs
-            .iter()
-            .map(|(index, scalar_expr)| {
-                if let Some(scalar_expr) = scalar_expr {
-                    let expr = scalar_expr
-                        .type_check(right_schema.as_ref())?
-                        .project_column_ref(|idx| right_schema.index_of(&idx.to_string()).unwrap());
-                    Ok((*index, Some(expr.as_remote_expr())))
-                } else {
-                    Ok((*index, None))
-                }
-            })
-            .collect::<Result<Vec<_>>>()?;
+        let left_outputs = process_outputs(&union_all.left_outputs, &left_required, &left_schema)?;
+        let right_outputs =
+            process_outputs(&union_all.right_outputs, &right_required, &right_schema)?;
 
         Ok(PhysicalPlan::UnionAll(UnionAll {
             plan_id: 0,
@@ -130,3 +125,24 @@ impl PhysicalPlanBuilder {
         }))
     }
 }
+
+fn process_outputs(
+    outputs: &[(IndexType, Option<ScalarExpr>)],
+    required: &ColumnSet,
+    schema: &DataSchema,
+) -> Result<Vec<(IndexType, Option<RemoteExpr>)>> {
+    outputs
+        .iter()
+        .filter(|(index, _)| required.contains(index))
+        .map(|(index, scalar_expr)| {
+            if let Some(scalar_expr) = scalar_expr {
+                let expr = scalar_expr
+                    .type_check(schema)?
+                    .project_column_ref(|idx| schema.index_of(&idx.to_string()).unwrap());
+                Ok((*index, Some(expr.as_remote_expr())))
+            } else {
+                Ok((*index, None))
+            }
+        })
+        .collect()
+}
diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test
index ff33ddcecee60..437f29a4e13bd 100644
--- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test
+++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0000_stream.test
@@ -349,6 +349,12 @@ select a, b, change$action, change$is_update from s3 as _change_delete order by
 4 4 INSERT 0
 6 5 INSERT 0
 
+# ISSUE 17085
+query I
+select a from s3 where a > 5
+----
+6
+
 statement ok
 create table t4(a int, b int)
 
diff --git a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_eval_scalar.test b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_eval_scalar.test
index 7bba98bae78a7..5c83998d444ea 100644
--- a/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_eval_scalar.test
+++ b/tests/sqllogictests/suites/mode/standalone/explain/push_down_filter/push_down_filter_eval_scalar.test
@@ -61,93 +61,81 @@ AggregateFinal
     ├── aggregate functions: []
     ├── estimated rows: 0.00
     └── UnionAll
-        ├── output columns: [t.id (#0), de (#6)]
+        ├── output columns: [t.id (#0)]
         ├── estimated rows: 0.00
-        ├── EvalScalar
-        │   ├── output columns: [t.id (#0), de (#6)]
-        │   ├── expressions: [if(CAST(is_not_null(sum(tb.de) (#5)) AS Boolean NULL), CAST(assume_not_null(sum(tb.de) (#5)) AS Int64 NULL), true, 0, NULL)]
+        ├── AggregateFinal
+        │   ├── output columns: [t.id (#0)]
+        │   ├── group by: [id]
+        │   ├── aggregate functions: []
         │   ├── estimated rows: 0.00
-        │   └── AggregateFinal
-        │       ├── output columns: [sum(tb.de) (#5), t.id (#0)]
+        │   └── AggregatePartial
         │       ├── group by: [id]
-        │       ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
+        │       ├── aggregate functions: []
         │       ├── estimated rows: 0.00
-        │       └── AggregatePartial
-        │           ├── group by: [id]
-        │           ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
+        │       └── HashJoin
+        │           ├── output columns: [t.id (#0)]
+        │           ├── join type: LEFT OUTER
+        │           ├── build keys: [tb.sid (#1)]
+        │           ├── probe keys: [t.id (#0)]
+        │           ├── filters: []
         │           ├── estimated rows: 0.00
-        │           └── HashJoin
-        │               ├── output columns: [t.id (#0), sum(coalesce(t3.val, 0)) (#4)]
-        │               ├── join type: LEFT OUTER
-        │               ├── build keys: [tb.sid (#1)]
-        │               ├── probe keys: [t.id (#0)]
-        │               ├── filters: []
+        │           ├── AggregateFinal(Build)
+        │           │   ├── output columns: [t2.sid (#1)]
+        │           │   ├── group by: [sid]
+        │           │   ├── aggregate functions: []
+        │           │   ├── estimated rows: 0.00
+        │           │   └── AggregatePartial
+        │           │       ├── group by: [sid]
+        │           │       ├── aggregate functions: []
+        │           │       ├── estimated rows: 0.00
+        │           │       └── Filter
+        │           │           ├── output columns: [t2.sid (#1)]
+        │           │           ├── filters: [is_true(t3.sid (#1) = 1)]
+        │           │           ├── estimated rows: 0.00
+        │           │           └── TableScan
+        │           │               ├── table: default.default.t2
+        │           │               ├── output columns: [sid (#1)]
+        │           │               ├── read rows: 0
+        │           │               ├── read size: 0
+        │           │               ├── partitions total: 0
+        │           │               ├── partitions scanned: 0
+        │           │               ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
+        │           │               └── estimated rows: 0.00
+        │           └── Filter(Probe)
+        │               ├── output columns: [t.id (#0)]
+        │               ├── filters: [is_true(t.id (#0) = 1)]
         │               ├── estimated rows: 0.00
-        │               ├── AggregateFinal(Build)
-        │               │   ├── output columns: [sum(coalesce(t3.val, 0)) (#4), t2.sid (#1)]
-        │               │   ├── group by: [sid]
-        │               │   ├── aggregate functions: [sum(sum_arg_0)]
-        │               │   ├── estimated rows: 0.00
-        │               │   └── AggregatePartial
-        │               │       ├── group by: [sid]
-        │               │       ├── aggregate functions: [sum(sum_arg_0)]
-        │               │       ├── estimated rows: 0.00
-        │               │       └── EvalScalar
-        │               │           ├── output columns: [t2.sid (#1), sum_arg_0 (#3)]
-        │               │           ├── expressions: [if(CAST(is_not_null(t3.val (#2)) AS Boolean NULL), CAST(assume_not_null(t3.val (#2)) AS Int32 NULL), true, 0, NULL)]
-        │               │           ├── estimated rows: 0.00
-        │               │           └── Filter
-        │               │               ├── output columns: [t2.sid (#1), t2.val (#2)]
-        │               │               ├── filters: [is_true(t3.sid (#1) = 1)]
-        │               │               ├── estimated rows: 0.00
-        │               │               └── TableScan
-        │               │                   ├── table: default.default.t2
-        │               │                   ├── output columns: [sid (#1), val (#2)]
-        │               │                   ├── read rows: 0
-        │               │                   ├── read size: 0
-        │               │                   ├── partitions total: 0
-        │               │                   ├── partitions scanned: 0
-        │               │                   ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
-        │               │                   └── estimated rows: 0.00
-        │               └── Filter(Probe)
-        │                   ├── output columns: [t.id (#0)]
-        │                   ├── filters: [is_true(t.id (#0) = 1)]
-        │                   ├── estimated rows: 0.00
-        │                   └── TableScan
-        │                       ├── table: default.default.t1
-        │                       ├── output columns: [id (#0)]
-        │                       ├── read rows: 0
-        │                       ├── read size: 0
-        │                       ├── partitions total: 0
-        │                       ├── partitions scanned: 0
-        │                       ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
-        │                       └── estimated rows: 0.00
-        └── EvalScalar
-            ├── output columns: [t.id (#7), de (#8)]
-            ├── expressions: [0]
+        │               └── TableScan
+        │                   ├── table: default.default.t1
+        │                   ├── output columns: [id (#0)]
+        │                   ├── read rows: 0
+        │                   ├── read size: 0
+        │                   ├── partitions total: 0
+        │                   ├── partitions scanned: 0
+        │                   ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
+        │                   └── estimated rows: 0.00
+        └── AggregateFinal
+            ├── output columns: [t.id (#7)]
+            ├── group by: [id]
+            ├── aggregate functions: []
             ├── estimated rows: 0.00
-            └── AggregateFinal
-                ├── output columns: [t.id (#7)]
+            └── AggregatePartial
                 ├── group by: [id]
                 ├── aggregate functions: []
                 ├── estimated rows: 0.00
-                └── AggregatePartial
-                    ├── group by: [id]
-                    ├── aggregate functions: []
+                └── Filter
+                    ├── output columns: [t.id (#7)]
+                    ├── filters: [is_true(t.id (#7) = 1)]
                     ├── estimated rows: 0.00
-                    └── Filter
-                        ├── output columns: [t.id (#7)]
-                        ├── filters: [is_true(t.id (#7) = 1)]
-                        ├── estimated rows: 0.00
-                        └── TableScan
-                            ├── table: default.default.t1
-                            ├── output columns: [id (#7)]
-                            ├── read rows: 0
-                            ├── read size: 0
-                            ├── partitions total: 0
-                            ├── partitions scanned: 0
-                            ├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
-                            └── estimated rows: 0.00
+                    └── TableScan
+                        ├── table: default.default.t1
+                        ├── output columns: [id (#7)]
+                        ├── read rows: 0
+                        ├── read size: 0
+                        ├── partitions total: 0
+                        ├── partitions scanned: 0
+                        ├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
+                        └── estimated rows: 0.00
 
 statement ok
 drop table if exists t1;
diff --git a/tests/sqllogictests/suites/mode/standalone/explain/union.test b/tests/sqllogictests/suites/mode/standalone/explain/union.test
index 76480538fb6a7..94c843087c790 100644
--- a/tests/sqllogictests/suites/mode/standalone/explain/union.test
+++ b/tests/sqllogictests/suites/mode/standalone/explain/union.test
@@ -218,6 +218,42 @@ Limit
             ├── push downs: [filters: [], limit: 1]
             └── estimated rows: 2.00
 
+# ISSUE 17085
+query T
+explain select b from (select * from t1 where a>1 union all select * from t2 where b>2)
+----
+UnionAll
+├── output columns: [t1.b (#1)]
+├── estimated rows: 2.00
+├── Filter
+│   ├── output columns: [t1.b (#1)]
+│   ├── filters: [is_true(t1.a (#0) > 1)]
+│   ├── estimated rows: 1.00
+│   └── TableScan
+│       ├── table: default.default.t1
+│       ├── output columns: [a (#0), b (#1)]
+│       ├── read rows: 2
+│       ├── read size: < 1 KiB
+│       ├── partitions total: 1
+│       ├── partitions scanned: 1
+│       ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
+│       ├── push downs: [filters: [is_true(t1.a (#0) > 1)], limit: NONE]
+│       └── estimated rows: 2.00
+└── Filter
+    ├── output columns: [t2.b (#3)]
+    ├── filters: [is_true(t2.b (#3) > 2)]
+    ├── estimated rows: 1.00
+    └── TableScan
+        ├── table: default.default.t2
+        ├── output columns: [b (#3)]
+        ├── read rows: 2
+        ├── read size: < 1 KiB
+        ├── partitions total: 1
+        ├── partitions scanned: 1
+        ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
+        ├── push downs: [filters: [is_true(t2.b (#3) > 2)], limit: NONE]
+        └── estimated rows: 2.00
+
 statement ok
 drop table t1
 
diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_eval_scalar.test b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_eval_scalar.test
index f7252d926e1eb..4fc1f97e0198e 100644
--- a/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_eval_scalar.test
+++ b/tests/sqllogictests/suites/mode/standalone/explain_native/push_down_filter/push_down_filter_eval_scalar.test
@@ -61,81 +61,69 @@ AggregateFinal
     ├── aggregate functions: []
     ├── estimated rows: 0.00
     └── UnionAll
-        ├── output columns: [t.id (#0), de (#6)]
+        ├── output columns: [t.id (#0)]
         ├── estimated rows: 0.00
-        ├── EvalScalar
-        │   ├── output columns: [t.id (#0), de (#6)]
-        │   ├── expressions: [if(CAST(is_not_null(sum(tb.de) (#5)) AS Boolean NULL), CAST(assume_not_null(sum(tb.de) (#5)) AS Int64 NULL), true, 0, NULL)]
+        ├── AggregateFinal
+        │   ├── output columns: [t.id (#0)]
+        │   ├── group by: [id]
+        │   ├── aggregate functions: []
         │   ├── estimated rows: 0.00
-        │   └── AggregateFinal
-        │       ├── output columns: [sum(tb.de) (#5), t.id (#0)]
+        │   └── AggregatePartial
         │       ├── group by: [id]
-        │       ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
+        │       ├── aggregate functions: []
         │       ├── estimated rows: 0.00
-        │       └── AggregatePartial
-        │           ├── group by: [id]
-        │           ├── aggregate functions: [sum(sum(coalesce(t3.val, 0)))]
+        │       └── HashJoin
+        │           ├── output columns: [t.id (#0)]
+        │           ├── join type: LEFT OUTER
+        │           ├── build keys: [tb.sid (#1)]
+        │           ├── probe keys: [t.id (#0)]
+        │           ├── filters: []
         │           ├── estimated rows: 0.00
-        │           └── HashJoin
-        │               ├── output columns: [t.id (#0), sum(coalesce(t3.val, 0)) (#4)]
-        │               ├── join type: LEFT OUTER
-        │               ├── build keys: [tb.sid (#1)]
-        │               ├── probe keys: [t.id (#0)]
-        │               ├── filters: []
-        │               ├── estimated rows: 0.00
-        │               ├── AggregateFinal(Build)
-        │               │   ├── output columns: [sum(coalesce(t3.val, 0)) (#4), t2.sid (#1)]
-        │               │   ├── group by: [sid]
-        │               │   ├── aggregate functions: [sum(sum_arg_0)]
-        │               │   ├── estimated rows: 0.00
-        │               │   └── AggregatePartial
-        │               │       ├── group by: [sid]
-        │               │       ├── aggregate functions: [sum(sum_arg_0)]
-        │               │       ├── estimated rows: 0.00
-        │               │       └── EvalScalar
-        │               │           ├── output columns: [t2.sid (#1), sum_arg_0 (#3)]
-        │               │           ├── expressions: [if(CAST(is_not_null(t3.val (#2)) AS Boolean NULL), CAST(assume_not_null(t3.val (#2)) AS Int32 NULL), true, 0, NULL)]
-        │               │           ├── estimated rows: 0.00
-        │               │           └── TableScan
-        │               │               ├── table: default.default.t2
-        │               │               ├── output columns: [sid (#1), val (#2)]
-        │               │               ├── read rows: 0
-        │               │               ├── read size: 0
-        │               │               ├── partitions total: 0
-        │               │               ├── partitions scanned: 0
-        │               │               ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
-        │               │               └── estimated rows: 0.00
-        │               └── TableScan(Probe)
-        │                   ├── table: default.default.t1
-        │                   ├── output columns: [id (#0)]
-        │                   ├── read rows: 0
-        │                   ├── read size: 0
-        │                   ├── partitions total: 0
-        │                   ├── partitions scanned: 0
-        │                   ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
-        │                   └── estimated rows: 0.00
-        └── EvalScalar
-            ├── output columns: [t.id (#7), de (#8)]
-            ├── expressions: [0]
+        │           ├── AggregateFinal(Build)
+        │           │   ├── output columns: [t2.sid (#1)]
+        │           │   ├── group by: [sid]
+        │           │   ├── aggregate functions: []
+        │           │   ├── estimated rows: 0.00
+        │           │   └── AggregatePartial
+        │           │       ├── group by: [sid]
+        │           │       ├── aggregate functions: []
+        │           │       ├── estimated rows: 0.00
+        │           │       └── TableScan
+        │           │           ├── table: default.default.t2
+        │           │           ├── output columns: [sid (#1)]
+        │           │           ├── read rows: 0
+        │           │           ├── read size: 0
+        │           │           ├── partitions total: 0
+        │           │           ├── partitions scanned: 0
+        │           │           ├── push downs: [filters: [is_true(t2.sid (#1) = 1)], limit: NONE]
+        │           │           └── estimated rows: 0.00
+        │           └── TableScan(Probe)
+        │               ├── table: default.default.t1
+        │               ├── output columns: [id (#0)]
+        │               ├── read rows: 0
+        │               ├── read size: 0
+        │               ├── partitions total: 0
+        │               ├── partitions scanned: 0
+        │               ├── push downs: [filters: [is_true(t1.id (#0) = 1)], limit: NONE]
+        │               └── estimated rows: 0.00
+        └── AggregateFinal
+            ├── output columns: [t.id (#7)]
+            ├── group by: [id]
+            ├── aggregate functions: []
             ├── estimated rows: 0.00
-            └── AggregateFinal
-                ├── output columns: [t.id (#7)]
+            └── AggregatePartial
                 ├── group by: [id]
                 ├── aggregate functions: []
                 ├── estimated rows: 0.00
-                └── AggregatePartial
-                    ├── group by: [id]
-                    ├── aggregate functions: []
-                    ├── estimated rows: 0.00
-                    └── TableScan
-                        ├── table: default.default.t1
-                        ├── output columns: [id (#7)]
-                        ├── read rows: 0
-                        ├── read size: 0
-                        ├── partitions total: 0
-                        ├── partitions scanned: 0
-                        ├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
-                        └── estimated rows: 0.00
+                └── TableScan
+                    ├── table: default.default.t1
+                    ├── output columns: [id (#7)]
+                    ├── read rows: 0
+                    ├── read size: 0
+                    ├── partitions total: 0
+                    ├── partitions scanned: 0
+                    ├── push downs: [filters: [is_true(t1.id (#7) = 1)], limit: NONE]
+                    └── estimated rows: 0.00
 
 statement ok
 drop table if exists t1;