1515// specific language governing permissions and limitations
1616// under the License.
1717
18- use arrow:: array:: builder:: { Int32Builder , StringBuilder } ;
19- use arrow:: datatypes:: { DataType , Field , Schema } ;
18+ use arrow:: array:: builder:: StringBuilder ;
19+ use arrow:: array:: { Array , ArrayRef , Int32Array } ;
20+ use arrow:: datatypes:: { Field , Schema } ;
2021use arrow:: record_batch:: RecordBatch ;
2122use criterion:: { black_box, criterion_group, criterion_main, Criterion } ;
22- use datafusion_common:: ScalarValue ;
2323use datafusion_expr:: Operator ;
24- use datafusion_physical_expr:: expressions:: { BinaryExpr , CaseExpr , Column , Literal } ;
24+ use datafusion_physical_expr:: expressions:: { case , col , lit , BinaryExpr } ;
2525use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
2626use std:: sync:: Arc ;
2727
28- fn make_col ( name : & str , index : usize ) -> Arc < dyn PhysicalExpr > {
29- Arc :: new ( Column :: new ( name, index) )
28+ fn make_x_cmp_y (
29+ x : & Arc < dyn PhysicalExpr > ,
30+ op : Operator ,
31+ y : i32 ,
32+ ) -> Arc < dyn PhysicalExpr > {
33+ Arc :: new ( BinaryExpr :: new ( Arc :: clone ( x) , op, lit ( y) ) )
3034}
3135
32- fn make_lit_i32 ( n : i32 ) -> Arc < dyn PhysicalExpr > {
33- Arc :: new ( Literal :: new ( ScalarValue :: Int32 ( Some ( n) ) ) )
34- }
36+ /// Create a record batch with the given number of rows and columns.
37+ /// Columns are named `c<i>` where `i` is the column index.
38+ ///
39+ /// The minimum value for `column_count` is `3`.
40+ /// `c0` contains incrementing int32 values
41+ /// `c1` contains strings with one null inserted every 7 rows
42+ /// `c2` contains strings with one null inserted every 9 rows
43+ /// `c3` to `cn`, is present, contain unspecified int32 values
44+ fn make_batch ( row_count : usize , column_count : usize ) -> RecordBatch {
45+ assert ! ( column_count >= 3 ) ;
3546
36- fn criterion_benchmark ( c : & mut Criterion ) {
37- // create input data
38- let mut c1 = Int32Builder :: new ( ) ;
3947 let mut c2 = StringBuilder :: new ( ) ;
4048 let mut c3 = StringBuilder :: new ( ) ;
41- for i in 0 ..1000 {
42- c1. append_value ( i) ;
49+ for i in 0 ..row_count {
4350 if i % 7 == 0 {
4451 c2. append_null ( ) ;
4552 } else {
@@ -51,72 +58,151 @@ fn criterion_benchmark(c: &mut Criterion) {
5158 c3. append_value ( format ! ( "other string {i}" ) ) ;
5259 }
5360 }
54- let c1 = Arc :: new ( c1 . finish ( ) ) ;
61+ let c1 = Arc :: new ( Int32Array :: from_iter_values ( 0 ..row_count as i32 ) ) ;
5562 let c2 = Arc :: new ( c2. finish ( ) ) ;
5663 let c3 = Arc :: new ( c3. finish ( ) ) ;
57- let schema = Schema :: new ( vec ! [
58- Field :: new( "c1" , DataType :: Int32 , true ) ,
59- Field :: new( "c2" , DataType :: Utf8 , true ) ,
60- Field :: new( "c3" , DataType :: Utf8 , true ) ,
61- ] ) ;
62- let batch = RecordBatch :: try_new ( Arc :: new ( schema) , vec ! [ c1, c2, c3] ) . unwrap ( ) ;
63-
64- // use same predicate for all benchmarks
65- let predicate = Arc :: new ( BinaryExpr :: new (
66- make_col ( "c1" , 0 ) ,
67- Operator :: LtEq ,
68- make_lit_i32 ( 500 ) ,
69- ) ) ;
64+ let mut columns: Vec < ArrayRef > = vec ! [ c1, c2, c3] ;
65+ for _ in 3 ..column_count {
66+ columns. push ( Arc :: new ( Int32Array :: from_iter_values ( 0 ..row_count as i32 ) ) ) ;
67+ }
7068
71- // CASE WHEN c1 <= 500 THEN 1 ELSE 0 END
72- c. bench_function ( "case_when: scalar or scalar" , |b| {
73- let expr = Arc :: new (
74- CaseExpr :: try_new (
75- None ,
76- vec ! [ ( predicate. clone( ) , make_lit_i32( 1 ) ) ] ,
77- Some ( make_lit_i32 ( 0 ) ) ,
69+ let fields = columns
70+ . iter ( )
71+ . enumerate ( )
72+ . map ( |( i, c) | {
73+ Field :: new (
74+ format ! ( "c{}" , i + 1 ) ,
75+ c. data_type ( ) . clone ( ) ,
76+ c. is_nullable ( ) ,
7877 )
79- . unwrap ( ) ,
80- ) ;
81- b. iter ( || black_box ( expr. evaluate ( black_box ( & batch) ) . unwrap ( ) ) )
82- } ) ;
78+ } )
79+ . collect :: < Vec < _ > > ( ) ;
8380
84- // CASE WHEN c1 <= 500 THEN c2 [ELSE NULL] END
85- c. bench_function ( "case_when: column or null" , |b| {
86- let expr = Arc :: new (
87- CaseExpr :: try_new ( None , vec ! [ ( predicate. clone( ) , make_col( "c2" , 1 ) ) ] , None )
81+ let schema = Arc :: new ( Schema :: new ( fields) ) ;
82+ RecordBatch :: try_new ( Arc :: clone ( & schema) , columns) . unwrap ( )
83+ }
84+
85+ fn criterion_benchmark ( c : & mut Criterion ) {
86+ run_benchmarks ( c, & make_batch ( 8192 , 3 ) ) ;
87+ run_benchmarks ( c, & make_batch ( 8192 , 50 ) ) ;
88+ run_benchmarks ( c, & make_batch ( 8192 , 100 ) ) ;
89+ }
90+
91+ fn run_benchmarks ( c : & mut Criterion , batch : & RecordBatch ) {
92+ let c1 = col ( "c1" , & batch. schema ( ) ) . unwrap ( ) ;
93+ let c2 = col ( "c2" , & batch. schema ( ) ) . unwrap ( ) ;
94+ let c3 = col ( "c3" , & batch. schema ( ) ) . unwrap ( ) ;
95+
96+ // No expression, when/then/else, literal values
97+ c. bench_function (
98+ format ! (
99+ "case_when {}x{}: CASE WHEN c1 <= 500 THEN 1 ELSE 0 END" ,
100+ batch. num_rows( ) ,
101+ batch. num_columns( )
102+ )
103+ . as_str ( ) ,
104+ |b| {
105+ let expr = Arc :: new (
106+ case (
107+ None ,
108+ vec ! [ ( make_x_cmp_y( & c1, Operator :: LtEq , 500 ) , lit( 1 ) ) ] ,
109+ Some ( lit ( 0 ) ) ,
110+ )
88111 . unwrap ( ) ,
89- ) ;
90- b. iter ( || black_box ( expr. evaluate ( black_box ( & batch) ) . unwrap ( ) ) )
91- } ) ;
112+ ) ;
113+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
114+ } ,
115+ ) ;
116+
117+ // No expression, when/then/else, column reference values
118+ c. bench_function (
119+ format ! (
120+ "case_when {}x{}: CASE WHEN c1 <= 500 THEN c2 ELSE c3 END" ,
121+ batch. num_rows( ) ,
122+ batch. num_columns( )
123+ )
124+ . as_str ( ) ,
125+ |b| {
126+ let expr = Arc :: new (
127+ case (
128+ None ,
129+ vec ! [ ( make_x_cmp_y( & c1, Operator :: LtEq , 500 ) , Arc :: clone( & c2) ) ] ,
130+ Some ( Arc :: clone ( & c3) ) ,
131+ )
132+ . unwrap ( ) ,
133+ ) ;
134+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
135+ } ,
136+ ) ;
92137
93- // CASE WHEN c1 <= 500 THEN c2 ELSE c3 END
94- c. bench_function ( "case_when: expr or expr" , |b| {
138+ // No expression, when/then, implicit else
139+ c. bench_function (
140+ format ! (
141+ "case_when {}x{}: CASE WHEN c1 <= 500 THEN c2 [ELSE NULL] END" ,
142+ batch. num_rows( ) ,
143+ batch. num_columns( )
144+ )
145+ . as_str ( ) ,
146+ |b| {
147+ let expr = Arc :: new (
148+ case (
149+ None ,
150+ vec ! [ ( make_x_cmp_y( & c1, Operator :: LtEq , 500 ) , Arc :: clone( & c2) ) ] ,
151+ None ,
152+ )
153+ . unwrap ( ) ,
154+ ) ;
155+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
156+ } ,
157+ ) ;
158+
159+ // With expression, two when/then branches
160+ c. bench_function (
161+ format ! (
162+ "case_when {}x{}: CASE c1 WHEN 1 THEN c2 WHEN 2 THEN c3 END" ,
163+ batch. num_rows( ) ,
164+ batch. num_columns( )
165+ )
166+ . as_str ( ) ,
167+ |b| {
168+ let expr = Arc :: new (
169+ case (
170+ Some ( Arc :: clone ( & c1) ) ,
171+ vec ! [ ( lit( 1 ) , Arc :: clone( & c2) ) , ( lit( 2 ) , Arc :: clone( & c3) ) ] ,
172+ None ,
173+ )
174+ . unwrap ( ) ,
175+ ) ;
176+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
177+ } ,
178+ ) ;
179+
180+ // Many when/then branches where all are effectively reachable
181+ c. bench_function ( format ! ( "case_when {}x{}: CASE WHEN c1 == 0 THEN 0 WHEN c1 == 1 THEN 1 ... WHEN c1 == n THEN n ELSE n + 1 END" , batch. num_rows( ) , batch. num_columns( ) ) . as_str ( ) , |b| {
182+ let when_thens = ( 0 ..batch. num_rows ( ) as i32 ) . map ( |i| ( make_x_cmp_y ( & c1, Operator :: Eq , i) , lit ( i) ) ) . collect ( ) ;
95183 let expr = Arc :: new (
96- CaseExpr :: try_new (
184+ case (
97185 None ,
98- vec ! [ ( predicate . clone ( ) , make_col ( "c2" , 1 ) ) ] ,
99- Some ( make_col ( "c3" , 2 ) ) ,
186+ when_thens ,
187+ Some ( lit ( batch . num_rows ( ) as i32 ) )
100188 )
101- . unwrap ( ) ,
189+ . unwrap ( ) ,
102190 ) ;
103- b. iter ( || black_box ( expr. evaluate ( black_box ( & batch) ) . unwrap ( ) ) )
191+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
104192 } ) ;
105193
106- // CASE c1 WHEN 1 THEN c2 WHEN 2 THEN c3 END
107- c. bench_function ( "case_when: CASE expr" , |b| {
194+ // Many when/then branches where all but the first few are effectively unreachable
195+ c. bench_function ( format ! ( "case_when {}x{}: CASE WHEN c1 < 0 THEN 0 WHEN c1 < 1000 THEN 1 ... WHEN c1 < n * 1000 THEN n ELSE n + 1 END" , batch. num_rows( ) , batch. num_columns( ) ) . as_str ( ) , |b| {
196+ let when_thens = ( 0 ..batch. num_rows ( ) as i32 ) . map ( |i| ( make_x_cmp_y ( & c1, Operator :: Eq , i * 1000 ) , lit ( i) ) ) . collect ( ) ;
108197 let expr = Arc :: new (
109- CaseExpr :: try_new (
110- Some ( make_col ( "c1" , 0 ) ) ,
111- vec ! [
112- ( make_lit_i32( 1 ) , make_col( "c2" , 1 ) ) ,
113- ( make_lit_i32( 2 ) , make_col( "c3" , 2 ) ) ,
114- ] ,
198+ case (
115199 None ,
200+ when_thens,
201+ Some ( lit ( batch. num_rows ( ) as i32 ) )
116202 )
117- . unwrap ( ) ,
203+ . unwrap ( ) ,
118204 ) ;
119- b. iter ( || black_box ( expr. evaluate ( black_box ( & batch) ) . unwrap ( ) ) )
205+ b. iter ( || black_box ( expr. evaluate ( black_box ( batch) ) . unwrap ( ) ) )
120206 } ) ;
121207}
122208
0 commit comments