@@ -35,28 +35,39 @@ use crate::{
35
35
utils:: arrow:: { adapt_batch, reverse} ,
36
36
} ;
37
37
38
+ /// `ReverseReader` provides an iterator over record batches in an Arrow IPC file format
39
+ /// in reverse order (from the last batch to the first).
40
+ ///
41
+ /// This is useful for scenarios where you need to process the most recent data first,
42
+ /// or when implementing time-series data exploration that starts with the latest records.
38
43
#[ derive( Debug ) ]
39
44
pub struct ReverseReader {
40
45
inner : FileReader < BufReader < File > > ,
46
+ /// Current index for iteration (starts from the last batch)
41
47
idx : usize ,
42
48
}
43
49
44
50
impl ReverseReader {
45
- fn try_new ( path : impl AsRef < Path > ) -> Result < Self , ArrowError > {
51
+ /// Creates a new `ReverseReader` from given path.
52
+ pub fn try_new ( path : impl AsRef < Path > ) -> Result < Self , ArrowError > {
46
53
let inner = FileReader :: try_new ( BufReader :: new ( File :: open ( path) . unwrap ( ) ) , None ) ?;
47
54
let idx = inner. num_batches ( ) ;
48
55
49
56
Ok ( Self { inner, idx } )
50
57
}
51
58
52
- fn schema ( & self ) -> SchemaRef {
59
+ /// Returns the schema of the underlying Arrow file.
60
+ pub fn schema ( & self ) -> SchemaRef {
53
61
self . inner . schema ( )
54
62
}
55
63
}
56
64
57
65
impl Iterator for ReverseReader {
58
66
type Item = Result < RecordBatch , ArrowError > ;
59
67
68
+ /// Returns the next record batch in reverse order(latest to the first) from arrows file.
69
+ ///
70
+ /// Returns `None` when all batches have been processed.
60
71
fn next ( & mut self ) -> Option < Self :: Item > {
61
72
if self . idx == 0 {
62
73
return None ;
@@ -158,19 +169,27 @@ fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 {
158
169
159
170
#[ cfg( test) ]
160
171
mod tests {
161
- use std:: { fs:: File , io, path:: Path , sync:: Arc } ;
172
+ use std:: {
173
+ fs:: File ,
174
+ io:: { self , Write } ,
175
+ path:: { Path , PathBuf } ,
176
+ sync:: Arc ,
177
+ } ;
162
178
163
179
use arrow_array:: {
164
180
cast:: AsArray , types:: Int64Type , Array , Float64Array , Int32Array , Int64Array , RecordBatch ,
165
181
StringArray ,
166
182
} ;
167
183
use arrow_ipc:: { reader:: FileReader , writer:: FileWriter } ;
168
- use arrow_schema:: { DataType , Field , Schema } ;
184
+ use arrow_schema:: { ArrowError , DataType , Field , Schema } ;
169
185
use chrono:: Utc ;
170
186
use temp_dir:: TempDir ;
171
187
172
188
use crate :: {
173
- parseable:: staging:: { reader:: MergedRecordReader , writer:: DiskWriter } ,
189
+ parseable:: staging:: {
190
+ reader:: { MergedRecordReader , ReverseReader } ,
191
+ writer:: DiskWriter ,
192
+ } ,
174
193
utils:: time:: TimeRange ,
175
194
OBJECT_STORE_DATA_GRANULARITY ,
176
195
} ;
@@ -403,4 +422,147 @@ mod tests {
403
422
404
423
Ok ( ( ) )
405
424
}
425
+
426
+ fn create_test_arrow_file ( path : & PathBuf , num_batches : usize ) -> Result < ( ) , ArrowError > {
427
+ // Create schema
428
+ let schema = Schema :: new ( vec ! [
429
+ Field :: new( "id" , DataType :: Int32 , false ) ,
430
+ Field :: new( "name" , DataType :: Utf8 , false ) ,
431
+ ] ) ;
432
+ let schema_ref = std:: sync:: Arc :: new ( schema) ;
433
+
434
+ // Create file and writer
435
+ let file = File :: create ( path) ?;
436
+ let mut writer = FileWriter :: try_new ( file, & schema_ref) ?;
437
+
438
+ // Create and write batches
439
+ for i in 0 ..num_batches {
440
+ let id_array =
441
+ Int32Array :: from ( vec ! [ i as i32 * 10 , i as i32 * 10 + 1 , i as i32 * 10 + 2 ] ) ;
442
+ let name_array = StringArray :: from ( vec ! [
443
+ format!( "batch_{i}_name_0" ) ,
444
+ format!( "batch_{i}_name_1" ) ,
445
+ format!( "batch_{i}_name_2" ) ,
446
+ ] ) ;
447
+
448
+ let batch = RecordBatch :: try_new (
449
+ schema_ref. clone ( ) ,
450
+ vec ! [
451
+ std:: sync:: Arc :: new( id_array) ,
452
+ std:: sync:: Arc :: new( name_array) ,
453
+ ] ,
454
+ ) ?;
455
+
456
+ writer. write ( & batch) ?;
457
+ }
458
+
459
+ writer. finish ( ) ?;
460
+ Ok ( ( ) )
461
+ }
462
+
463
+ #[ test]
464
+ fn test_reverse_reader_creation ( ) {
465
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
466
+ let file_path = temp_dir. path ( ) . join ( "test.arrow" ) ;
467
+
468
+ // Create test file with 3 batches
469
+ create_test_arrow_file ( & file_path, 3 ) . unwrap ( ) ;
470
+
471
+ // Test successful creation
472
+ let reader = ReverseReader :: try_new ( & file_path) ;
473
+ assert ! ( reader. is_ok( ) ) ;
474
+
475
+ // Test schema retrieval
476
+ let reader = reader. unwrap ( ) ;
477
+ let schema = reader. schema ( ) ;
478
+ assert_eq ! ( schema. fields( ) . len( ) , 2 ) ;
479
+ assert_eq ! ( schema. field( 0 ) . name( ) , "id" ) ;
480
+ assert_eq ! ( schema. field( 1 ) . name( ) , "name" ) ;
481
+ }
482
+
483
+ #[ test]
484
+ fn test_reverse_reader_iteration ( ) {
485
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
486
+ let file_path = temp_dir. path ( ) . join ( "test.arrow" ) ;
487
+
488
+ // Create test file with 3 batches
489
+ create_test_arrow_file ( & file_path, 3 ) . unwrap ( ) ;
490
+
491
+ // Create reader and iterate
492
+ let reader = ReverseReader :: try_new ( & file_path) . unwrap ( ) ;
493
+ let batches: Vec < _ > = reader. collect :: < Result < Vec < _ > , _ > > ( ) . unwrap ( ) ;
494
+
495
+ // Verify correct number of batches
496
+ assert_eq ! ( batches. len( ) , 3 ) ;
497
+
498
+ // Verify reverse order
499
+ // Batch 2 (last written, first read)
500
+ let batch0 = & batches[ 0 ] ;
501
+ assert_eq ! ( batch0. num_columns( ) , 2 ) ;
502
+ let id_array = batch0
503
+ . column ( 0 )
504
+ . as_any ( )
505
+ . downcast_ref :: < Int32Array > ( )
506
+ . unwrap ( ) ;
507
+ assert_eq ! ( id_array. value( 0 ) , 20 ) ;
508
+
509
+ // Batch 1 (middle)
510
+ let batch1 = & batches[ 1 ] ;
511
+ let id_array = batch1
512
+ . column ( 0 )
513
+ . as_any ( )
514
+ . downcast_ref :: < Int32Array > ( )
515
+ . unwrap ( ) ;
516
+ assert_eq ! ( id_array. value( 0 ) , 10 ) ;
517
+
518
+ // Batch 0 (first written, last read)
519
+ let batch2 = & batches[ 2 ] ;
520
+ let id_array = batch2
521
+ . column ( 0 )
522
+ . as_any ( )
523
+ . downcast_ref :: < Int32Array > ( )
524
+ . unwrap ( ) ;
525
+ assert_eq ! ( id_array. value( 0 ) , 0 ) ;
526
+ }
527
+
528
+ #[ test]
529
+ fn test_empty_file ( ) {
530
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
531
+ let file_path = temp_dir. path ( ) . join ( "empty.arrow" ) ;
532
+
533
+ // Create empty file with schema but no batches
534
+ create_test_arrow_file ( & file_path, 0 ) . unwrap ( ) ;
535
+
536
+ let reader = ReverseReader :: try_new ( & file_path) . unwrap ( ) ;
537
+ let batches: Vec < _ > = reader. collect :: < Result < Vec < _ > , _ > > ( ) . unwrap ( ) ;
538
+
539
+ // Should be empty
540
+ assert_eq ! ( batches. len( ) , 0 ) ;
541
+ }
542
+
543
+ #[ test]
544
+ fn test_invalid_file ( ) {
545
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
546
+ let file_path = temp_dir. path ( ) . join ( "invalid.txt" ) ;
547
+
548
+ // Create a non-Arrow file
549
+ let mut file = File :: create ( & file_path) . unwrap ( ) ;
550
+ writeln ! ( & mut file, "This is not an Arrow file" ) . unwrap ( ) ;
551
+
552
+ // Attempting to create a reader should fail
553
+ let reader = ReverseReader :: try_new ( & file_path) ;
554
+ assert ! ( reader. is_err( ) ) ;
555
+ }
556
+
557
+ #[ test]
558
+ fn test_num_batches ( ) {
559
+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
560
+ let file_path = temp_dir. path ( ) . join ( "test.arrow" ) ;
561
+
562
+ // Create test file with 5 batches
563
+ create_test_arrow_file ( & file_path, 5 ) . unwrap ( ) ;
564
+
565
+ let reader = ReverseReader :: try_new ( & file_path) . unwrap ( ) ;
566
+ assert_eq ! ( reader. count( ) , 5 ) ;
567
+ }
406
568
}
0 commit comments