diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index c0287df2f6fb5..da05e3ac46009 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -17,11 +17,9 @@ //! Parquet Data source -use std::cell::RefCell; use std::fs::File; -use std::rc::Rc; use std::string::String; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arrow::array::Array; use arrow::datatypes::{DataType, Field, Schema}; @@ -33,7 +31,7 @@ use parquet::data_type::ByteArray; use parquet::file::reader::*; use parquet::schema::types::Type; -use crate::datasource::{RecordBatchIterator, Table}; +use crate::datasource::{RecordBatchIterator, ScanResult, Table}; use crate::execution::error::{ExecutionError, Result}; use arrow::builder::BooleanBuilder; use arrow::builder::Int64Builder; @@ -66,10 +64,10 @@ impl Table for ParquetTable { &self, projection: &Option>, _batch_size: usize, - ) -> Result>> { + ) -> Result> { let file = File::open(self.filename.clone()).unwrap(); let parquet_file = ParquetFile::open(file, projection.clone()).unwrap(); - Ok(Rc::new(RefCell::new(parquet_file))) + Ok(vec![Arc::new(Mutex::new(parquet_file))]) } } @@ -482,7 +480,7 @@ mod tests { let projection = None; let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(11, batch.num_columns()); @@ -495,7 +493,7 @@ mod tests { let projection = Some(vec![1]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -523,7 +521,7 @@ mod tests { let projection = Some(vec![0]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -548,7 +546,7 @@ mod tests { let projection = Some(vec![10]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -573,7 +571,7 @@ mod tests { let projection = Some(vec![6]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -601,7 +599,7 @@ mod tests { let projection = Some(vec![7]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -629,7 +627,7 @@ mod tests { let projection = Some(vec![9]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns()); @@ -658,7 +656,7 @@ mod tests { let projection = Some(vec![0]); let scan = table.scan(&projection, 1024).unwrap(); - let mut it = scan.borrow_mut(); + let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); assert_eq!(1, batch.num_columns());