Skip to content

Commit

Permalink
chore(query): enable none lazy pruner in lazy read (#15942)
Browse files Browse the repository at this point in the history
* chore(query): enable none lazy pruner in lazy read

* update

* update

* update

---------

Co-authored-by: Bohu <overred.shuttler@gmail.com>
  • Loading branch information
sundy-li and BohuTANG authored Jul 2, 2024
1 parent 7a58aa7 commit d62710d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl PhysicalPlanBuilder {
}

// If `lazy_columns` is not empty, build a `RowFetch` plan on top of the `Limit` plan.

let input_schema = input_plan.output_schema()?;

// Lazy materialization is enabled.
Expand Down
6 changes: 1 addition & 5 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ impl FuseTable {
}

let snapshot = self.read_table_snapshot().await?;
let is_lazy = push_downs
.as_ref()
.map(|p| p.lazy_materialization)
.unwrap_or_default();
match snapshot {
Some(snapshot) => {
let snapshot_loc = self
Expand All @@ -89,7 +85,7 @@ impl FuseTable {
nodes_num = cluster.nodes.len();
}

if (!dry_run && snapshot.segments.len() > nodes_num) || is_lazy {
if !dry_run && snapshot.segments.len() > nodes_num {
let mut segments = Vec::with_capacity(snapshot.segments.len());
for (idx, segment_location) in snapshot.segments.iter().enumerate() {
segments.push(FuseLazyPartInfo::create(idx, segment_location.clone()))
Expand Down
117 changes: 68 additions & 49 deletions tests/sqllogictests/suites/mode/cluster/lazy_read.test
Original file line number Diff line number Diff line change
@@ -1,75 +1,91 @@
statement ok
drop table if exists t_lazy
create or replace table t_lazy (a int not null, b float not null, c string not null, d tuple(a int, b int) not null, e date not null)

statement ok
create table t_lazy (a int not null, b float not null, c string not null, d tuple(a int, b int) not null, e date not null)
set max_threads = 4;

statement ok
insert into t_lazy select number + 1, 1.1, '1.1', (1,2), '2020-01-01' from numbers(100)

statement ok
insert into t_lazy select (number + 1) * 1000, 1.1, '1.1', (1,2), '2020-01-01' from numbers(100)

statement ok
insert into t_lazy select (number + 1) * 100000, 1.1, '1.1', (1,2), '2020-01-01' from numbers(100)

query T
select count(), count(distinct _block_name), count(distinct _segment_name) from t_lazy
----
300 3 3

query T
explain select * from t_lazy order by a desc limit 2
----
RowFetch
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── columns to fetch: [b, c, d, e]
├── estimated rows: 0.00
├── estimated rows: 2.00
└── Limit
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 2.00
└── Sort
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── sort keys: [a DESC NULLS LAST]
├── estimated rows: 0.00
├── estimated rows: 300.00
└── Exchange
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), #_order_col]
├── exchange type: Merge
└── Sort
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), #_order_col]
├── sort keys: [a DESC NULLS LAST]
├── estimated rows: 0.00
├── estimated rows: 300.00
└── TableScan
├── table: default.default.t_lazy
├── output columns: [a (#0), _row_id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── read rows: 200
├── read size: < 1 KiB
├── partitions total: 3
├── partitions scanned: 2
├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 3>]
├── push downs: [filters: [], limit: 2]
└── estimated rows: 0.00
└── estimated rows: 300.00

query T
explain select * from t_lazy where a > 1 limit 2
explain select * from t_lazy where a < 100 limit 2
----
RowFetch
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── columns to fetch: [b, c, d, e]
├── estimated rows: 0.00
├── estimated rows: 1.00
└── Limit
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 1.00
└── Exchange
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── exchange type: Merge
└── Limit
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 1.00
└── Filter
├── output columns: [t_lazy.a (#0), t_lazy._row_id (#7)]
├── filters: [t_lazy.a (#0) > 1]
├── estimated rows: 0.00
├── filters: [t_lazy.a (#0) < 100]
├── estimated rows: 1.00
└── TableScan
├── table: default.default.t_lazy
├── output columns: [a (#0), _row_id (#7)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [t_lazy.a (#0) > 1], limit: NONE]
└── estimated rows: 0.00
├── read rows: 100
├── read size: < 1 KiB
├── partitions total: 3
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 3 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [t_lazy.a (#0) < 100], limit: NONE]
└── estimated rows: 300.00

statement ok
set lazy_read_threshold=0
Expand All @@ -81,57 +97,59 @@ Limit
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 2.00
└── Sort
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── sort keys: [a DESC NULLS LAST]
├── estimated rows: 0.00
├── estimated rows: 300.00
└── Exchange
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6), #_order_col]
├── exchange type: Merge
└── Sort
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6), #_order_col]
├── sort keys: [a DESC NULLS LAST]
├── estimated rows: 0.00
├── estimated rows: 300.00
└── TableScan
├── table: default.default.t_lazy
├── output columns: [a (#0), b (#1), c (#2), d (#3), e (#6)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── read rows: 200
├── read size: 1.12 KiB
├── partitions total: 3
├── partitions scanned: 2
├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 3>]
├── push downs: [filters: [], limit: 2]
└── estimated rows: 0.00
└── estimated rows: 300.00

query T
explain select * from t_lazy where a > 1 limit 2;
explain select * from t_lazy where a < 100 limit 2;
----
Limit
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 1.00
└── Exchange
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── exchange type: Merge
└── Limit
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 1.00
└── Filter
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── filters: [t_lazy.a (#0) > 1]
├── estimated rows: 0.00
├── filters: [t_lazy.a (#0) < 100]
├── estimated rows: 1.00
└── TableScan
├── table: default.default.t_lazy
├── output columns: [a (#0), b (#1), c (#2), d (#3), e (#6)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── push downs: [filters: [t_lazy.a (#0) > 1], limit: NONE]
└── estimated rows: 0.00
├── read rows: 100
├── read size: < 1 KiB
├── partitions total: 3
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 3 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [t_lazy.a (#0) < 100], limit: NONE]
└── estimated rows: 300.00

query T
explain select * from t_lazy where true limit 2;
Expand All @@ -140,24 +158,25 @@ Limit
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 2.00
└── Exchange
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── exchange type: Merge
└── Limit
├── output columns: [t_lazy.a (#0), t_lazy.b (#1), t_lazy.c (#2), t_lazy.d (#3), t_lazy.e (#6)]
├── limit: 2
├── offset: 0
├── estimated rows: 0.00
├── estimated rows: 2.00
└── TableScan
├── table: default.default.t_lazy
├── output columns: [a (#0), b (#1), c (#2), d (#3), e (#6)]
├── read rows: 0
├── read size: 0
├── partitions total: 0
├── partitions scanned: 0
├── read rows: 100
├── read size: < 1 KiB
├── partitions total: 3
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 3 to 3>, blocks: <range pruning: 3 to 1>]
├── push downs: [filters: [], limit: 2]
└── estimated rows: 0.00
└── estimated rows: 300.00


statement ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ HashJoin
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
Expand All @@ -61,6 +62,7 @@ HashJoin
├── read size: < 1 KiB
├── partitions total: 1
├── partitions scanned: 1
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 4.00

Expand Down Expand Up @@ -92,15 +94,17 @@ HashJoin
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
├── table: default.default.salaries2
├── output columns: [employee_id (#3), _row_id (#5)]
├── read rows: 6
├── read size: 1.31 KiB
├── read size: < 1 KiB
├── partitions total: 2
├── partitions scanned: 2
├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 6.00

Expand Down Expand Up @@ -133,15 +137,17 @@ HashJoin
│ ├── read size: < 1 KiB
│ ├── partitions total: 1
│ ├── partitions scanned: 1
│ ├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1>]
│ ├── push downs: [filters: [], limit: NONE]
│ └── estimated rows: 4.00
└── TableScan(Probe)
├── table: default.default.salaries2
├── output columns: [employee_id (#3), _row_id (#5)]
├── read rows: 6
├── read size: 1.31 KiB
├── read size: < 1 KiB
├── partitions total: 2
├── partitions scanned: 2
├── pruning stats: [segments: <range pruning: 2 to 2>, blocks: <range pruning: 2 to 2>]
├── push downs: [filters: [], limit: NONE]
└── estimated rows: 6.00

Expand Down

0 comments on commit d62710d

Please sign in to comment.