|
17 | 17 |
|
18 | 18 | use super::*; |
19 | 19 | use insta::assert_snapshot; |
20 | | -use regex::Regex; |
21 | 20 | use rstest::rstest; |
22 | 21 |
|
23 | 22 | use datafusion::config::ConfigOptions; |
@@ -57,42 +56,41 @@ async fn explain_analyze_baseline_metrics() { |
57 | 56 |
|
58 | 57 | println!("Query Output:\n\n{formatted}"); |
59 | 58 |
|
60 | | - let re = Regex::new(r"\|[^|]*\|([^|]*)\|").unwrap(); |
61 | | - let actual = formatted |
62 | | - .lines() |
63 | | - .map(|line| re.replace_all(line, "$1").trim_end().to_string()) |
64 | | - .filter(|line| !line.is_empty() && !line.starts_with('+')) |
65 | | - .collect::<Vec<_>>() |
66 | | - .join("\n"); |
67 | | - insta::with_settings!({filters => vec![ |
68 | | - (r"\d+\.?\d*[µmn]?s", "[TIME]"), |
69 | | - (r"\[\[[^\]]*?/(testing/data/csv/[^/]+\.csv)\]\]", "[[$1]]"), |
70 | | - ]}, { |
71 | | - insta::assert_snapshot!(actual,@r#" |
72 | | - plan |
73 | | - CoalescePartitionsExec: fetch=3, metrics=[output_rows=3, elapsed_compute=[TIME]] |
74 | | - UnionExec, metrics=[output_rows=3, elapsed_compute=[TIME]] |
75 | | - ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
76 | | - AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=1, elapsed_compute=[TIME]] |
77 | | - CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=[TIME]] |
78 | | - AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=3, elapsed_compute=[TIME]] |
79 | | - ProjectionExec: expr=[], metrics=[output_rows=5, elapsed_compute=[TIME]] |
80 | | - AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=[TIME], spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=50592] |
81 | | - CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=[TIME]] |
82 | | - RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3, metrics=[fetch_time=[TIME], repartition_time=[TIME], send_time=[TIME]] |
83 | | - AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=[TIME], spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=52216] |
84 | | - CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, elapsed_compute=[TIME]] |
85 | | - FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0], metrics=[output_rows=99, elapsed_compute=[TIME]] |
86 | | - RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1, metrics=[fetch_time=[TIME], repartition_time=[TIME], send_time=[TIME]] |
87 | | - DataSourceExec: file_groups={1 group: [[testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true, metrics=[output_rows=100, elapsed_compute=[TIME], file_open_errors=0, file_scan_errors=0, time_elapsed_opening=[TIME], time_elapsed_processing=[TIME], time_elapsed_scanning_total=[TIME], time_elapsed_scanning_until_data=[TIME]] |
88 | | - ProjectionExec: expr=[1 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
89 | | - PlaceholderRowExec, metrics=[] |
90 | | - ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt], metrics=[output_rows=1, elapsed_compute=[TIME]] |
91 | | - BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted], metrics=[output_rows=1, elapsed_compute=[TIME]] |
92 | | - ProjectionExec: expr=[1 as c1], metrics=[output_rows=1, elapsed_compute=[TIME]] |
93 | | - PlaceholderRowExec, metrics=[] |
94 | | - "#); |
95 | | - }); |
| 59 | + assert_metrics!( |
| 60 | + &formatted, |
| 61 | + "AggregateExec: mode=Partial, gby=[]", |
| 62 | + "metrics=[output_rows=3, elapsed_compute=" |
| 63 | + ); |
| 64 | + assert_metrics!( |
| 65 | + &formatted, |
| 66 | + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", |
| 67 | + "metrics=[output_rows=5, elapsed_compute=" |
| 68 | + ); |
| 69 | + assert_metrics!( |
| 70 | + &formatted, |
| 71 | + "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", |
| 72 | + "metrics=[output_rows=99, elapsed_compute=" |
| 73 | + ); |
| 74 | + assert_metrics!( |
| 75 | + &formatted, |
| 76 | + "ProjectionExec: expr=[]", |
| 77 | + "metrics=[output_rows=5, elapsed_compute=" |
| 78 | + ); |
| 79 | + assert_metrics!( |
| 80 | + &formatted, |
| 81 | + "CoalesceBatchesExec: target_batch_size=4096", |
| 82 | + "metrics=[output_rows=5, elapsed_compute" |
| 83 | + ); |
| 84 | + assert_metrics!( |
| 85 | + &formatted, |
| 86 | + "UnionExec", |
| 87 | + "metrics=[output_rows=3, elapsed_compute=" |
| 88 | + ); |
| 89 | + assert_metrics!( |
| 90 | + &formatted, |
| 91 | + "WindowAggExec", |
| 92 | + "metrics=[output_rows=1, elapsed_compute=" |
| 93 | + ); |
96 | 94 |
|
97 | 95 | fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { |
98 | 96 | use datafusion::physical_plan; |
|
0 commit comments