diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index fddf709a617..ee28ed94cd7 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -5,7 +5,7 @@ use std::{collections::HashMap, io::Read}; use arrow2::array::Array; use arrow2::io::ipc::IpcField; use arrow2::{ - columns::Columns, + chunk::Chunk, datatypes::{DataType, Schema}, error::Result, io::{ @@ -25,7 +25,7 @@ use flate2::read::GzDecoder; pub fn read_gzip_json( version: &str, file_name: &str, -) -> Result<(Schema, Vec, Vec>>)> { +) -> Result<(Schema, Vec, Vec>>)> { let path = format!( "../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz", version, file_name @@ -53,7 +53,7 @@ pub fn read_gzip_json( let batches = arrow_json .batches .iter() - .map(|batch| read::deserialize_columns(&schema, &ipc_fields, batch, &dictionaries)) + .map(|batch| read::deserialize_chunk(&schema, &ipc_fields, batch, &dictionaries)) .collect::>>()?; Ok((schema, ipc_fields, batches)) @@ -148,7 +148,7 @@ fn main() -> Result<()> { } }) .collect(); - Columns::try_new(columns).unwrap() + Chunk::try_new(columns).unwrap() }) .collect::>() } else { diff --git a/benches/filter_kernels.rs b/benches/filter_kernels.rs index b6981f0e4f0..fb9376b0d66 100644 --- a/benches/filter_kernels.rs +++ b/benches/filter_kernels.rs @@ -19,8 +19,8 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::columns::Columns; -use arrow2::compute::filter::{build_filter, filter, filter_columns, Filter}; +use arrow2::chunk::Chunk; +use arrow2::compute::filter::{build_filter, filter, filter_chunk, Filter}; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array}; @@ -125,7 +125,7 @@ fn add_benchmark(c: &mut Criterion) { let data_array = create_primitive_array::(size, 0.0); - let columns = Columns::try_new(vec![Arc::new(data_array)]).unwrap(); + let columns = Chunk::try_new(vec![Arc::new(data_array)]).unwrap(); c.bench_function("filter single record batch", |b| { b.iter(|| filter_record_batch(&columns, &filter_array)) diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 8c39c246f85..e4c3fa12022 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -37,8 +37,8 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; - for maybe_columns in reader { - let columns = maybe_columns?; + for maybe_chunk in reader { + let columns = maybe_chunk?; assert_eq!(columns.len(), size); } Ok(()) diff --git a/benches/write_csv.rs b/benches/write_csv.rs index 6f5f6102c96..d88afdc16f9 100644 --- a/benches/write_csv.rs +++ b/benches/write_csv.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::csv::write; use arrow2::util::bench_util::*; -fn write_batch(columns: &Columns) -> Result<()> { +fn write_batch(columns: &Chunk) -> Result<()> { let writer = &mut write::WriterBuilder::new().from_writer(vec![]); assert_eq!(columns.arrays().len(), 1); @@ -18,8 +18,8 @@ fn write_batch(columns: &Columns) -> Result<()> { write::write_batch(writer, batch, &options) } -fn make_columns(array: impl Array + 'static) -> Columns> { - Columns::new(vec![Arc::new(array)]) +fn make_chunk(array: impl Array + 'static) -> Chunk> { + Chunk::new(vec![Arc::new(array)]) } fn add_benchmark(c: &mut Criterion) { @@ -27,21 +27,21 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(log2_size); let array = create_primitive_array::(size, 0.1); - let batch = make_columns(array); + let batch = make_chunk(array); c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| { b.iter(|| write_batch(&batch)) }); let array = create_string_array::(size, 100, 0.1, 42); - let batch = make_columns(array); + let batch = make_chunk(array); c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| { b.iter(|| write_batch(&batch)) }); let array = create_primitive_array::(size, 0.1); - let batch = make_columns(array); + let batch = make_chunk(array); c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| { b.iter(|| write_batch(&batch)) diff --git a/benches/write_ipc.rs b/benches/write_ipc.rs index 52dff65a324..2ec35a5b9a4 100644 --- a/benches/write_ipc.rs +++ b/benches/write_ipc.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::{Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::write::*; @@ -13,7 +13,7 @@ use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, cre fn write(array: &dyn Array) -> Result<()> { let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let columns = Columns::try_new(vec![clone(array).into()])?; + let columns = Chunk::try_new(vec![clone(array).into()])?; let writer = Cursor::new(vec![]); let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?; diff --git a/benches/write_json.rs b/benches/write_json.rs index 3c3ab4328b1..11f550c90f5 100644 --- a/benches/write_json.rs +++ b/benches/write_json.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::json::write; use arrow2::util::bench_util::*; -fn write_batch(columns: &Columns>) -> Result<()> { +fn write_batch(columns: &Chunk>) -> Result<()> { let mut writer = vec![]; let format = write::JsonArray::default(); @@ -23,8 +23,8 @@ fn write_batch(columns: &Columns>) -> Result<()> { Ok(()) } -fn make_columns(array: impl Array + 'static) -> Columns> { - Columns::new(vec![Arc::new(array) as Arc]) +fn make_chunk(array: impl Array + 'static) -> Chunk> { + Chunk::new(vec![Arc::new(array) as Arc]) } fn add_benchmark(c: &mut Criterion) { @@ -32,21 +32,21 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(log2_size); let array = create_primitive_array::(size, 0.1); - let columns = make_columns(array); + let columns = make_chunk(array); c.bench_function(&format!("json write i32 2^{}", log2_size), |b| { b.iter(|| write_batch(&columns)) }); let array = create_string_array::(size, 100, 0.1, 42); - let columns = make_columns(array); + let columns = make_chunk(array); c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| { b.iter(|| write_batch(&columns)) }); let array = create_primitive_array::(size, 0.1); - let columns = make_columns(array); + let columns = make_chunk(array); c.bench_function(&format!("json write f64 2^{}", log2_size), |b| { b.iter(|| write_batch(&columns)) diff --git a/benches/write_parquet.rs b/benches/write_parquet.rs index ee06a4e24ad..e8dc072e24f 100644 --- a/benches/write_parquet.rs +++ b/benches/write_parquet.rs @@ -3,13 +3,13 @@ use std::io::Cursor; use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::array::{clone, Array}; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::parquet::write::*; use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array}; fn write(array: &dyn Array, encoding: Encoding) -> Result<()> { - let columns = Columns::new(vec![clone(array).into()]); + let columns = Chunk::new(vec![clone(array).into()]); let schema = batch.schema().clone(); let options = WriteOptions { diff --git a/examples/avro_read.rs b/examples/avro_read.rs index f03237e9e46..6d7722ac1d4 100644 --- a/examples/avro_read.rs +++ b/examples/avro_read.rs @@ -22,8 +22,8 @@ fn main() -> Result<()> { schema.fields, ); - for maybe_columns in reader { - let columns = maybe_columns?; + for maybe_chunk in reader { + let columns = maybe_chunk?; assert!(!columns.is_empty()); } Ok(()) diff --git a/examples/csv_read.rs b/examples/csv_read.rs index 9a47b52fd32..6d0319c13c4 100644 --- a/examples/csv_read.rs +++ b/examples/csv_read.rs @@ -1,11 +1,11 @@ use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::csv::read; -fn read_path(path: &str, projection: Option<&[usize]>) -> Result>> { +fn read_path(path: &str, projection: Option<&[usize]>) -> Result>> { // Create a CSV reader. This is typically created on the thread that reads the file and // thus owns the read head. let mut reader = read::ReaderBuilder::new().from_path(path)?; diff --git a/examples/csv_read_parallel.rs b/examples/csv_read_parallel.rs index c9ce149d30d..ad9744b64c5 100644 --- a/examples/csv_read_parallel.rs +++ b/examples/csv_read_parallel.rs @@ -5,10 +5,10 @@ use std::thread; use std::time::SystemTime; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::{error::Result, io::csv::read}; -fn parallel_read(path: &str) -> Result>>> { +fn parallel_read(path: &str) -> Result>>> { let batch_size = 100; let has_header = true; let projection = None; diff --git a/examples/csv_write.rs b/examples/csv_write.rs index 791590fcdc4..f9f73cfef53 100644 --- a/examples/csv_write.rs +++ b/examples/csv_write.rs @@ -1,11 +1,11 @@ use arrow2::{ array::{Array, Int32Array}, - columns::Columns, + chunk::Chunk, error::Result, io::csv::write, }; -fn write_batch>(path: &str, columns: &[Columns]) -> Result<()> { +fn write_batch>(path: &str, columns: &[Chunk]) -> Result<()> { let writer = &mut write::WriterBuilder::new().from_path(path)?; write::write_header(writer, &["c1"])?; @@ -13,7 +13,7 @@ fn write_batch>(path: &str, columns: &[Columns]) -> Resul let options = write::SerializeOptions::default(); columns .iter() - .try_for_each(|batch| write::write_columns(writer, batch, &options)) + .try_for_each(|batch| write::write_chunk(writer, batch, &options)) } fn main() -> Result<()> { @@ -26,7 +26,7 @@ fn main() -> Result<()> { Some(5), Some(6), ]); - let batch = Columns::try_new(vec![&array as &dyn Array])?; + let batch = Chunk::try_new(vec![&array as &dyn Array])?; write_batch("example.csv", &[batch]) } diff --git a/examples/csv_write_parallel.rs b/examples/csv_write_parallel.rs index a12a26a2e5f..f616a1b8292 100644 --- a/examples/csv_write_parallel.rs +++ b/examples/csv_write_parallel.rs @@ -5,12 +5,12 @@ use std::thread; use arrow2::{ array::{Array, Int32Array}, - columns::Columns, + chunk::Chunk, error::Result, io::csv::write, }; -fn parallel_write(path: &str, batches: [Columns>; 2]) -> Result<()> { +fn parallel_write(path: &str, batches: [Chunk>; 2]) -> Result<()> { let options = write::SerializeOptions::default(); // write a header @@ -60,7 +60,7 @@ fn main() -> Result<()> { Some(5), Some(6), ]); - let columns = Columns::new(vec![Arc::new(array) as Arc]); + let columns = Chunk::new(vec![Arc::new(array) as Arc]); parallel_write("example.csv", [columns.clone(), columns]) } diff --git a/examples/extension.rs b/examples/extension.rs index cf185bb6897..24e390a4c1d 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -2,7 +2,7 @@ use std::io::{Cursor, Seek, Write}; use std::sync::Arc; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::ipc::read; @@ -40,14 +40,14 @@ fn write_ipc(writer: W, array: impl Array + 'static) -> Result< let options = write::WriteOptions { compression: None }; let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?; - let batch = Columns::try_new(vec![Arc::new(array) as Arc])?; + let batch = Chunk::try_new(vec![Arc::new(array) as Arc])?; writer.write(&batch, None)?; Ok(writer.into_inner()) } -fn read_ipc(buf: &[u8]) -> Result>> { +fn read_ipc(buf: &[u8]) -> Result>> { let mut cursor = Cursor::new(buf); let metadata = read::read_file_metadata(&mut cursor)?; let mut reader = read::FileReader::new(cursor, metadata, None); diff --git a/examples/ipc_file_read.rs b/examples/ipc_file_read.rs index 5193ef09eee..a88e18f25c1 100644 --- a/examples/ipc_file_read.rs +++ b/examples/ipc_file_read.rs @@ -2,19 +2,19 @@ use std::fs::File; use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read::{read_file_metadata, FileReader}; use arrow2::io::print; -fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { +fn read_batches(path: &str) -> Result<(Schema, Vec>>)> { let mut file = File::open(path)?; // read the files' metadata. At this point, we can distribute the read whatever we like. let metadata = read_file_metadata(&mut file)?; - let schema = metadata.schema().as_ref().clone(); + let schema = metadata.schema.clone(); // Simplest way: use the reader, an iterator over batches. let reader = FileReader::new(file, metadata, None); diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index ca91783fef3..219d6d9d548 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -2,12 +2,12 @@ use std::fs::File; use std::sync::Arc; use arrow2::array::{Array, Int32Array, Utf8Array}; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::{DataType, Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::write; -fn write_batches(path: &str, schema: &Schema, columns: &[Columns>]) -> Result<()> { +fn write_batches(path: &str, schema: &Schema, columns: &[Chunk>]) -> Result<()> { let file = File::create(path)?; let options = write::WriteOptions { compression: None }; @@ -34,7 +34,7 @@ fn main() -> Result<()> { let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]); let b = Utf8Array::::from_slice(&["a", "b", "c", "d", "e"]); - let batch = Columns::try_new(vec![Arc::new(a) as Arc, Arc::new(b)])?; + let batch = Chunk::try_new(vec![Arc::new(a) as Arc, Arc::new(b)])?; // write it write_batches(file_path, &schema, &[batch])?; diff --git a/examples/json_read.rs b/examples/json_read.rs index 827e8db0fdf..a3eb2f33ef4 100644 --- a/examples/json_read.rs +++ b/examples/json_read.rs @@ -3,11 +3,11 @@ use std::io::BufReader; use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::json::read; -fn read_path(path: &str, projection: Option>) -> Result>> { +fn read_path(path: &str, projection: Option>) -> Result>> { // Example of reading a JSON file. let mut reader = BufReader::new(File::open(path)?); @@ -31,7 +31,7 @@ fn read_path(path: &str, projection: Option>) -> Result, - batches: &[Columns>], -) -> Result<()> { +fn write_batches(path: &str, names: Vec, batches: &[Chunk>]) -> Result<()> { let mut writer = File::create(path)?; let format = write::JsonArray::default(); @@ -41,6 +37,6 @@ fn main() -> Result<()> { write_batches( "example.json", vec!["c1".to_string()], - &[Columns::new(vec![array.clone()]), Columns::new(vec![array])], + &[Chunk::new(vec![array.clone()]), Chunk::new(vec![array])], ) } diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index 01a146129d9..f3d206ac6b1 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -6,11 +6,11 @@ use std::time::SystemTime; use crossbeam_channel::unbounded; use arrow2::{ - array::Array, columns::Columns, error::Result, io::parquet::read, + array::Array, chunk::Chunk, error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, }; -fn parallel_read(path: &str, row_group: usize) -> Result>> { +fn parallel_read(path: &str, row_group: usize) -> Result>> { // prepare a channel to send compressed pages across threads. let (tx, rx) = unbounded(); @@ -99,7 +99,7 @@ fn parallel_read(path: &str, row_group: usize) -> Result> let columns = columns.into_iter().map(|x| x.1.into()).collect(); println!("Finished - {:?}", start.elapsed().unwrap()); - Columns::try_new(columns) + Chunk::try_new(columns) } fn main() -> Result<()> { diff --git a/examples/parquet_read_parallel/src/main.rs b/examples/parquet_read_parallel/src/main.rs index aa12fcd5fb9..827343ee0f2 100644 --- a/examples/parquet_read_parallel/src/main.rs +++ b/examples/parquet_read_parallel/src/main.rs @@ -7,14 +7,14 @@ use std::time::SystemTime; use rayon::prelude::*; use arrow2::{ - error::Result, io::parquet::read, io::parquet::read::MutStreamingIterator, - record_batch::RecordBatch, + array::Array, chunk::Chunk, error::Result, io::parquet::read, + io::parquet::read::MutStreamingIterator, }; -fn parallel_read(path: &str, row_group: usize) -> Result { +fn parallel_read(path: &str, row_group: usize) -> Result>> { let mut file = BufReader::new(File::open(path)?); let file_metadata = read::read_metadata(&mut file)?; - let arrow_schema = Arc::new(read::get_schema(&file_metadata)?); + let schema = read::get_schema(&file_metadata)?; // IO-bounded let columns = file_metadata @@ -58,13 +58,13 @@ fn parallel_read(path: &str, row_group: usize) -> Result { .into_par_iter() .map(|(field_i, parquet_field, column_chunks)| { let columns = read::ReadColumnIterator::new(parquet_field, column_chunks); - let field = &arrow_schema.fields()[field_i]; + let field = &schema.fields()[field_i]; read::column_iter_to_array(columns, field, vec![]).map(|x| x.0.into()) }) .collect::>>()?; - RecordBatch::try_new(arrow_schema, columns) + Chunk::try_new(columns) } fn main() -> Result<()> { @@ -75,7 +75,7 @@ fn main() -> Result<()> { let start = SystemTime::now(); let batch = parallel_read(file_path, row_group)?; - assert!(batch.num_rows() > 0); + assert!(!batch.is_empty()); println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) diff --git a/examples/parquet_read_record.rs b/examples/parquet_read_record.rs index 1342db14f94..9cb2f65ff91 100644 --- a/examples/parquet_read_record.rs +++ b/examples/parquet_read_record.rs @@ -14,8 +14,8 @@ fn main() -> Result<()> { let reader = read::RecordReader::try_new(reader, None, None, None, None)?; let start = SystemTime::now(); - for maybe_columns in reader { - let columns = maybe_columns?; + for maybe_chunk in reader { + let columns = maybe_chunk?; assert!(!columns.is_empty()); } println!("took: {} ms", start.elapsed().unwrap().as_millis()); diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index feef774decd..915bbe98233 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -6,12 +6,14 @@ use rayon::prelude::*; use arrow2::{ array::*, - datatypes::PhysicalType, + chunk::Chunk as AChunk, + datatypes::*, error::{ArrowError, Result}, io::parquet::write::*, - record_batch::RecordBatch, }; +type Chunk = AChunk>; + struct Bla { columns: VecDeque, current: Option, @@ -40,13 +42,16 @@ impl FallibleStreamingIterator for Bla { } } -fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> { +fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()> { + // declare the options let options = WriteOptions { write_statistics: true, compression: Compression::Snappy, version: Version::V2, }; - let encodings = batches[0].schema().fields().par_iter().map(|field| { + + // declare encodings + let encodings = schema.fields().par_iter().map(|field| { match field.data_type().to_physical_type() { // let's be fancy and use delta-encoding for binary fields PhysicalType::Binary @@ -58,7 +63,8 @@ fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> { } }); - let parquet_schema = to_parquet_schema(batches[0].schema())?; + // derive the parquet schema (physical types) from arrow's schema. + let parquet_schema = to_parquet_schema(schema)?; let a = parquet_schema.clone(); let row_groups = batches.iter().map(|batch| { @@ -89,19 +95,12 @@ fn parallel_write(path: &str, batches: &[RecordBatch]) -> Result<()> { let mut file = std::fs::File::create(path)?; // Write the file. - let _file_size = write_file( - &mut file, - row_groups, - batches[0].schema(), - parquet_schema, - options, - None, - )?; + let _file_size = write_file(&mut file, row_groups, schema, parquet_schema, options, None)?; Ok(()) } -fn create_batch(size: usize) -> Result { +fn create_batch(size: usize) -> Result { let c1: Int32Array = (0..size) .map(|x| if x % 9 == 0 { None } else { Some(x as i32) }) .collect(); @@ -115,14 +114,21 @@ fn create_batch(size: usize) -> Result { }) .collect(); - RecordBatch::try_from_iter([ - ("c1", Arc::new(c1) as Arc), - ("c2", Arc::new(c2) as Arc), + Chunk::try_new(vec![ + Arc::new(c1) as Arc, + Arc::new(c2) as Arc, ]) } fn main() -> Result<()> { + let schema = Schema { + fields: vec![ + Field::new("c1", DataType::Int32, true), + Field::new("c1", DataType::Utf8, true), + ], + metadata: Default::default(), + }; let batch = create_batch(5_000_000)?; - parallel_write("example.parquet", &[batch.clone(), batch]) + parallel_write("example.parquet", &schema, &[batch.clone(), batch]) } diff --git a/examples/parquet_write_record.rs b/examples/parquet_write_record.rs index 86800a491a7..ace66402540 100644 --- a/examples/parquet_write_record.rs +++ b/examples/parquet_write_record.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arrow2::{ array::{Array, Int32Array}, - columns::Columns, + chunk::Chunk, datatypes::{Field, Schema}, error::Result, io::parquet::write::{ @@ -11,7 +11,7 @@ use arrow2::{ }, }; -fn write_batch(path: &str, schema: Schema, columns: Columns>) -> Result<()> { +fn write_batch(path: &str, schema: Schema, columns: Chunk>) -> Result<()> { let options = WriteOptions { write_statistics: true, compression: Compression::Uncompressed, @@ -51,7 +51,7 @@ fn main() -> Result<()> { ]); let field = Field::new("c1", array.data_type().clone(), true); let schema = Schema::new(vec![field]); - let columns = Columns::new(vec![Arc::new(array) as Arc]); + let columns = Chunk::new(vec![Arc::new(array) as Arc]); write_batch("test.parquet", schema, columns) } diff --git a/guide/src/arrow.md b/guide/src/arrow.md index d59166205fb..d1abb5196f3 100644 --- a/guide/src/arrow.md +++ b/guide/src/arrow.md @@ -89,7 +89,7 @@ sharing can be done regarding the language that is used. And thanks to this standardization the data can also be shared with processes that don't share the same memory. By creating a data server, packets of data -with known structure (RecordBatch) can be sent across computers (or pods) and +with known structure (Chunk) can be sent across computers (or pods) and the receiving process doesn't need to spend time coding and decoding the data to a known format. The data is ready to be used once its being received. diff --git a/guide/src/io/ipc_read.md b/guide/src/io/ipc_read.md index fe1a8520140..bd8b39d54a8 100644 --- a/guide/src/io/ipc_read.md +++ b/guide/src/io/ipc_read.md @@ -8,7 +8,7 @@ Reading it generally consists of: 1. read metadata, containing the block positions in the file 2. seek to each block and read it -The example below shows how to read them into `RecordBatch`es: +The example below shows how to read them into `Chunk`es: ```rust {{#include ../../../examples/ipc_file_read.rs}} diff --git a/guide/src/io/parquet_read.md b/guide/src/io/parquet_read.md index 48ba8651ced..7ced401c849 100644 --- a/guide/src/io/parquet_read.md +++ b/guide/src/io/parquet_read.md @@ -6,9 +6,9 @@ It makes minimal assumptions on how you to decompose CPU and IO intensive tasks. First, some notation: -* `page`: part of a column (e.g. similar of a slice of an `Array`) -* `column chunk`: composed of multiple pages (similar of an `Array`) -* `row group`: a group of columns with the same length (similar of a `RecordBatch` in Arrow) +* `page`: part of a column (e.g. similar to a slice of an `Array`) +* `column chunk`: composed of multiple pages (similar to an `Array`) +* `row group`: a group of columns with the same length (similar to a `Chunk`) Here is how to read a single column chunk from a single row group: diff --git a/guide/src/io/parquet_write.md b/guide/src/io/parquet_write.md index 4e9cf75036e..a0272bd9709 100644 --- a/guide/src/io/parquet_write.md +++ b/guide/src/io/parquet_write.md @@ -7,9 +7,9 @@ as an higher-level API to abstract away some of this work into an easy to use AP First, some notation: -* `page`: part of a column (e.g. similar of a slice of an `Array`) -* `column chunk`: composed of multiple pages (similar of an `Array`) -* `row group`: a group of columns with the same length (similar of a `RecordBatch` in Arrow) +* `page`: part of a column (e.g. similar to a slice of an `Array`) +* `column chunk`: composed of multiple pages (similar to an `Array`) +* `row group`: a group of columns with the same length (similar to a `Chunk` in Arrow) ## Single threaded @@ -20,7 +20,7 @@ Here is an example of how to write a single column chunk into a single row group ``` For single-threaded writing, this crate offers an API that encapsulates the above logic. It -assumes that a `RecordBatch` is mapped to a single row group with a single page per column. +assumes that a `Chunk` is mapped to a single row group with a single page per column. ```rust {{#include ../../../examples/parquet_write_record.rs}} diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index beb0cb34714..6cbb0766f4f 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -101,7 +101,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> let schema = json_write::serialize_schema(&metadata.schema, &metadata.ipc_schema.fields); let batches = reader - .map(|batch| Ok(json_write::serialize_columns(&batch?, &names))) + .map(|batch| Ok(json_write::serialize_chunk(&batch?, &names))) .collect::>>()?; let arrow_json = ArrowJson { diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index b402dc0798c..049be88f772 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -19,7 +19,7 @@ use crate::{read_json_file, ArrowFile}; use arrow2::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::*, io::ipc::{ read::{self, Dictionaries}, @@ -46,7 +46,7 @@ type Result = std::result::Result; type Client = FlightServiceClient; -type Chunk = Columns>; +type ChunkArc = Chunk>; pub async fn run_scenario(host: &str, port: &str, path: &str) -> Result { let url = format!("http://{}:{}", host, port); @@ -88,7 +88,7 @@ async fn upload_data( schema: &Schema, fields: &[IpcField], descriptor: FlightDescriptor, - original_data: Vec, + original_data: Vec, ) -> Result { let (mut upload_tx, upload_rx) = mpsc::channel(10); @@ -143,7 +143,7 @@ async fn upload_data( async fn send_batch( upload_tx: &mut mpsc::Sender, metadata: &[u8], - batch: &Chunk, + batch: &ChunkArc, fields: &[IpcField], options: &write::WriteOptions, ) -> Result { @@ -164,7 +164,7 @@ async fn verify_data( descriptor: FlightDescriptor, expected_schema: &Schema, ipc_schema: &IpcSchema, - expected_data: &[Chunk], + expected_data: &[ChunkArc], ) -> Result { let resp = client.get_flight_info(Request::new(descriptor)).await?; let info = resp.into_inner(); @@ -200,7 +200,7 @@ async fn verify_data( async fn consume_flight_location( location: Location, ticket: Ticket, - expected_data: &[Chunk], + expected_data: &[ChunkArc], schema: &Schema, ipc_schema: &IpcSchema, ) -> Result { diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 685e40ec53d..bd0d2532107 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -20,7 +20,7 @@ use std::pin::Pin; use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::io::flight::{deserialize_schemas, serialize_batch, serialize_schema}; use arrow2::io::ipc::read::Dictionaries; use arrow2::io::ipc::IpcSchema; @@ -62,7 +62,7 @@ pub async fn scenario_setup(port: &str) -> Result { struct IntegrationDataset { schema: Schema, ipc_schema: IpcSchema, - chunks: Vec>>, + chunks: Vec>>, } #[derive(Clone, Default)] @@ -281,7 +281,7 @@ async fn record_batch_from_message( fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, -) -> Result>, Status> { +) -> Result>, Status> { let ipc_batch = message .header_as_record_batch() .ok_or_else(|| Status::internal("Could not parse message header as record batch"))?; @@ -300,7 +300,7 @@ async fn record_batch_from_message( ); arrow_batch_result - .map_err(|e| Status::internal(format!("Could not convert to RecordBatch: {:?}", e))) + .map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e))) } async fn dictionary_from_message( diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs index 89646e371ef..0fa3f9e1e0b 100644 --- a/integration-testing/src/lib.rs +++ b/integration-testing/src/lib.rs @@ -21,7 +21,7 @@ use arrow2::array::Array; use arrow2::io::ipc::IpcField; use serde_json::Value; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::json_integration::{read, ArrowJsonBatch, ArrowJsonDictionaryBatch}; @@ -45,7 +45,7 @@ pub struct ArrowFile { // we can evolve this into a concrete Arrow type // this is temporarily not being read from pub _dictionaries: HashMap, - pub batches: Vec>>, + pub batches: Vec>>, } pub fn read_json_file(json_name: &str) -> Result { @@ -74,7 +74,7 @@ pub fn read_json_file(json_name: &str) -> Result { .iter() .map(|b| { let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap(); - read::deserialize_columns(&schema, &fields, &json_batch, &dictionaries) + read::deserialize_chunk(&schema, &fields, &json_batch, &dictionaries) }) .collect::>()?; Ok(ArrowFile { diff --git a/src/columns.rs b/src/chunk.rs similarity index 70% rename from src/columns.rs rename to src/chunk.rs index 940ab63452f..9162a6251d2 100644 --- a/src/columns.rs +++ b/src/chunk.rs @@ -1,25 +1,25 @@ -//! Contains [`Columns`], a container of [`Array`] where every array has the +//! Contains [`Chunk`], a container of [`Array`] where every array has the //! same length. use crate::array::Array; use crate::error::{ArrowError, Result}; /// A vector of trait objects of [`Array`] where every item has -/// the same length, [`Columns::len`]. +/// the same length, [`Chunk::len`]. #[derive(Debug, Clone, PartialEq)] -pub struct Columns> { +pub struct Chunk> { arrays: Vec, } -impl> Columns { - /// Creates a new [`Columns`]. +impl> Chunk { + /// Creates a new [`Chunk`]. /// # Panic /// Iff the arrays do not have the same length pub fn new(arrays: Vec) -> Self { Self::try_new(arrays).unwrap() } - /// Creates a new [`Columns`]. + /// Creates a new [`Chunk`]. /// # Error /// Iff the arrays do not have the same length pub fn try_new(arrays: Vec) -> Result { @@ -31,19 +31,19 @@ impl> Columns { .any(|array| array.len() != len) { return Err(ArrowError::InvalidArgumentError( - "Columns require all its arrays to have an equal number of rows".to_string(), + "Chunk require all its arrays to have an equal number of rows".to_string(), )); } } Ok(Self { arrays }) } - /// returns the [`Array`]s in [`Columns`] + /// returns the [`Array`]s in [`Chunk`] pub fn arrays(&self) -> &[A] { &self.arrays } - /// returns the [`Array`]s in [`Columns`] + /// returns the [`Array`]s in [`Chunk`] pub fn columns(&self) -> &[A] { &self.arrays } @@ -61,20 +61,20 @@ impl> Columns { self.len() == 0 } - /// Consumes [`Columns`] into its underlying arrays. + /// Consumes [`Chunk`] into its underlying arrays. /// The arrays are guaranteed to have the same length pub fn into_arrays(self) -> Vec { self.arrays } } -impl> From> for Vec { - fn from(c: Columns) -> Self { +impl> From> for Vec { + fn from(c: Chunk) -> Self { c.into_arrays() } } -impl> std::ops::Deref for Columns { +impl> std::ops::Deref for Chunk { type Target = [A]; #[inline] diff --git a/src/compute/filter.rs b/src/compute/filter.rs index 262ac2b9add..4366da4f2d2 100644 --- a/src/compute/filter.rs +++ b/src/compute/filter.rs @@ -1,7 +1,7 @@ //! Contains operators to filter arrays such as [`filter`]. use crate::array::growable::{make_growable, Growable}; use crate::bitmap::{utils::SlicesIterator, Bitmap, MutableBitmap}; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::DataType; use crate::error::Result; use crate::{array::*, types::NativeType}; @@ -152,12 +152,12 @@ pub fn filter(array: &dyn Array, filter: &BooleanArray) -> Result } } -/// Returns a new [Columns] with arrays containing only values matching the filter. +/// Returns a new [Chunk] with arrays containing only values matching the filter. /// This is a convenience function: filter multiple columns is embarassingly parallel. -pub fn filter_columns>( - columns: &Columns, +pub fn filter_chunk>( + columns: &Chunk, filter_values: &BooleanArray, -) -> Result>> { +) -> Result>> { let arrays = columns.arrays(); let num_colums = arrays.len(); @@ -171,5 +171,5 @@ pub fn filter_columns>( arrays.iter().map(|a| filter(a.as_ref())).collect() } }; - Columns::try_new(filtered_arrays) + Chunk::try_new(filtered_arrays) } diff --git a/src/compute/sort/lex_sort.rs b/src/compute/sort/lex_sort.rs index cab5e3dcec5..695269c92d2 100644 --- a/src/compute/sort/lex_sort.rs +++ b/src/compute/sort/lex_sort.rs @@ -39,7 +39,7 @@ pub struct SortColumn<'a> { /// let int64 = Int64Array::from(&[None, Some(-2), Some(89), Some(-64), Some(101)]); /// let utf8 = Utf8Array::::from(&vec![Some("hello"), Some("world"), Some(","), Some("foobar"), Some("!")]); /// -/// let sorted_columns = lexsort::(&vec![ +/// let sorted_chunk = lexsort::(&vec![ /// SortColumn { /// values: &int64, /// options: None, @@ -53,7 +53,7 @@ pub struct SortColumn<'a> { /// }, /// ], None).unwrap(); /// -/// let sorted = sorted_columns[0].as_any().downcast_ref::().unwrap(); +/// let sorted = sorted_chunk[0].as_any().downcast_ref::().unwrap(); /// assert_eq!(sorted.value(1), -64); /// assert!(sorted.is_null(0)); /// ``` diff --git a/src/doc/lib.md b/src/doc/lib.md index b9879f60879..9185b7c15b8 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -15,7 +15,7 @@ use arrow2::array::*; use arrow2::compute::arithmetics; use arrow2::error::Result; use arrow2::io::parquet::write::*; -use arrow2::record_batch::RecordBatch; +use arrow2::chunk::Chunk; fn main() -> Result<()> { // declare arrays @@ -26,11 +26,11 @@ fn main() -> Result<()> { let c = arithmetics::basic::mul_scalar(&a, &2); assert_eq!(c, b); - // declare records - let batch = RecordBatch::try_from_iter([ - ("c1", Arc::new(a) as Arc), - ("c2", Arc::new(b) as Arc), - ])?; + // declare chunk + let batch = Chunk::new(vec![ + Arc::new(a) as Arc, + Arc::new(b) as Arc, + ]); // with metadata println!("{:?}", batch.schema()); diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 156543ba582..f3b37ec1546 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use avro_schema::{Enum, Schema as AvroSchema}; use crate::array::*; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::ArrowError; use crate::error::Result; @@ -242,12 +242,12 @@ fn deserialize_value<'a>( Ok(block) } -/// Deserializes a [`Block`] into [`Columns`]. +/// Deserializes a [`Block`] into [`Chunk`]. pub fn deserialize( block: &Block, fields: &[Field], avro_schemas: &[AvroSchema], -) -> Result>> { +) -> Result>> { let rows = block.number_of_rows; let mut block = block.data.as_ref(); @@ -271,5 +271,5 @@ pub fn deserialize( block = deserialize_item(array.as_mut(), field.is_nullable(), avro_field, block)? } } - Columns::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect()) + Chunk::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect()) } diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 6c6f88ca7c5..b48302d7b27 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -20,7 +20,7 @@ pub(super) use header::deserialize_header; pub(super) use schema::convert_schema; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::{Field, Schema}; use crate::error::Result; @@ -43,7 +43,7 @@ pub fn read_metadata( Ok((avro_schema, schema, codec, marker)) } -/// Single threaded, blocking reader of Avro; [`Iterator`] of [`Columns`]. +/// Single threaded, blocking reader of Avro; [`Iterator`] of [`Chunk`]. pub struct Reader { iter: Decompressor, avro_schemas: Vec, @@ -67,7 +67,7 @@ impl Reader { } impl Iterator for Reader { - type Item = Result>>; + type Item = Result>>; fn next(&mut self) -> Option { let fields = &self.fields[..]; diff --git a/src/io/csv/read/deserialize.rs b/src/io/csv/read/deserialize.rs index 4bcbb91a17f..2a86e97c167 100644 --- a/src/io/csv/read/deserialize.rs +++ b/src/io/csv/read/deserialize.rs @@ -4,7 +4,7 @@ use csv::ByteRecord; use crate::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::{DataType, Field}, error::Result, }; @@ -31,7 +31,7 @@ pub fn deserialize_column( deserialize_column_gen(rows, column, datatype, line_number) } -/// Deserializes rows [`ByteRecord`] into a [`Columns`]. +/// Deserializes rows [`ByteRecord`] into a [`Chunk`]. /// Note that this is a convenience function: column deserialization /// is trivially parallelizable (e.g. rayon). pub fn deserialize_batch( @@ -40,7 +40,7 @@ pub fn deserialize_batch( projection: Option<&[usize]>, line_number: usize, deserialize_column: F, -) -> Result>> +) -> Result>> where F: Fn(&[ByteRecord], usize, DataType, usize) -> Result>, { diff --git a/src/io/csv/read_async/deserialize.rs b/src/io/csv/read_async/deserialize.rs index 175c5875a22..be64b7398b3 100644 --- a/src/io/csv/read_async/deserialize.rs +++ b/src/io/csv/read_async/deserialize.rs @@ -4,7 +4,7 @@ use csv_async::ByteRecord; use crate::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::{DataType, Field}, error::Result, }; @@ -31,7 +31,7 @@ pub fn deserialize_column( deserialize_column_gen(rows, column, datatype, line_number) } -/// Deserializes rows [`ByteRecord`] into [`Columns`]. +/// Deserializes rows [`ByteRecord`] into [`Chunk`]. /// Note that this is a convenience function: column deserialization /// is trivially parallelizable (e.g. rayon). pub fn deserialize_batch( @@ -40,7 +40,7 @@ pub fn deserialize_batch( projection: Option<&[usize]>, line_number: usize, deserialize_column: F, -) -> Result>> +) -> Result>> where F: Fn(&[ByteRecord], usize, DataType, usize) -> Result>, { diff --git a/src/io/csv/read_utils.rs b/src/io/csv/read_utils.rs index 5567743cf37..5ecdc0e8d5f 100644 --- a/src/io/csv/read_utils.rs +++ b/src/io/csv/read_utils.rs @@ -11,7 +11,7 @@ pub(crate) trait ByteRecordGeneric { use crate::{ array::*, - columns::Columns, + chunk::Chunk, datatypes::*, error::{ArrowError, Result}, temporal_conversions, @@ -257,7 +257,7 @@ pub(crate) fn deserialize_column( }) } -/// Deserializes rows [`ByteRecord`] into [`Columns`]. +/// Deserializes rows [`ByteRecord`] into [`Chunk`]. /// Note that this is a convenience function: column deserialization /// is embarassingly parallel (e.g. rayon). pub(crate) fn deserialize_batch( @@ -266,7 +266,7 @@ pub(crate) fn deserialize_batch( projection: Option<&[usize]>, line_number: usize, deserialize_column: F, -) -> Result>> +) -> Result>> where F: Fn(&[B], usize, DataType, usize) -> Result>, { @@ -276,7 +276,7 @@ where }; if rows.is_empty() { - return Ok(Columns::new(vec![])); + return Ok(Chunk::new(vec![])); } projection @@ -288,5 +288,5 @@ where deserialize_column(rows, column, data_type.clone(), line_number) }) .collect::>>() - .and_then(Columns::try_new) + .and_then(Chunk::try_new) } diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index ec3225a85b9..37d926f482e 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -11,7 +11,7 @@ pub use csv::{ByteRecord, Writer, WriterBuilder}; pub use serialize::*; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::error::Result; /// Creates serializers that iterate over each column that serializes each item according @@ -26,11 +26,11 @@ fn new_serializers<'a, A: AsRef>( .collect() } -/// Serializes [`Columns`] to a vector of `ByteRecord`. +/// Serializes [`Chunk`] to a vector of `ByteRecord`. /// The vector is guaranteed to have `columns.len()` entries. /// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields. pub fn serialize>( - columns: &Columns, + columns: &Chunk, options: &SerializeOptions, ) -> Result> { let mut serializers = new_serializers(columns, options)?; @@ -40,16 +40,16 @@ pub fn serialize>( records.iter_mut().for_each(|record| { serializers .iter_mut() - // `unwrap` is infalible because `array.len()` equals `len` in `Columns::len` + // `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len` .for_each(|iter| record.push_field(iter.next().unwrap())); }); Ok(records) } -/// Writes [`Columns`] to `writer` according to the serialization options `options`. -pub fn write_columns>( +/// Writes [`Chunk`] to `writer` according to the serialization options `options`. +pub fn write_chunk>( writer: &mut Writer, - columns: &Columns, + columns: &Chunk, options: &SerializeOptions, ) -> Result<()> { let mut serializers = new_serializers(columns.arrays(), options)?; @@ -61,7 +61,7 @@ pub fn write_columns>( (0..rows).try_for_each(|_| { serializers .iter_mut() - // `unwrap` is infalible because `array.len()` equals `Columns::len` + // `unwrap` is infalible because `array.len()` equals `Chunk::len` .for_each(|iter| record.push_field(iter.next().unwrap())); writer.write_byte_record(&record)?; record.clear(); diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index e509c773c60..14e0650c4da 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -5,27 +5,27 @@ use arrow_format::ipc; use crate::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::*, error::{ArrowError, Result}, io::ipc::read, io::ipc::write, - io::ipc::write::common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions}, + io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}, }; use super::ipc::{IpcField, IpcSchema}; -/// Serializes [`Columns`] to a vector of [`FlightData`] representing the serialized dictionaries +/// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries /// and a [`FlightData`] representing the batch. pub fn serialize_batch( - columns: &Columns>, + columns: &Chunk>, fields: &[IpcField], options: &WriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = - encode_columns(columns, fields, &mut dictionary_tracker, options) + encode_chunk(columns, fields, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -99,13 +99,13 @@ pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> { } } -/// Deserializes [`FlightData`] to [`Columns`]. +/// Deserializes [`FlightData`] to [`Chunk`]. pub fn deserialize_batch( data: &FlightData, fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &read::Dictionaries, -) -> Result>> { +) -> Result>> { // check that the data_header is a record batch message let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| { ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err)) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index 33bda3491b4..37bcec2ccc8 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -33,8 +33,8 @@ //! # use std::fs::File; //! # use std::sync::Arc; //! # use arrow2::datatypes::{Field, Schema, DataType}; -//! # use arrow2::array::Int32Array; -//! # use arrow2::record_batch::RecordBatch; +//! # use arrow2::array::{Int32Array, Array}; +//! # use arrow2::chunk::Chunk; //! # use arrow2::error::ArrowError; //! // Setup the writer //! let path = "example.arrow".to_string(); @@ -48,14 +48,13 @@ //! // Setup the data //! let x_data = Int32Array::from_slice([-1i32, 1]); //! let y_data = Int32Array::from_slice([1i32, -1]); -//! let batch = RecordBatch::try_new( -//! Arc::new(schema), -//! vec![Arc::new(x_data), Arc::new(y_data)] -//! )?; +//! let chunk = Chunk::try_new( +//! vec![Arc::new(x_data) as Arc, Arc::new(y_data)] +//! )?; //! //! // Write the messages and finalize the stream //! for _ in 0..5 { -//! writer.write(&batch, None); +//! writer.write(&chunk, None); //! } //! writer.finish(); //! diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index ffd71561b41..6ce6320a7ec 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -6,7 +6,7 @@ use arrow_format::ipc; use arrow_format::ipc::Schema::MetadataVersion; use crate::array::*; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::{DataType, Field}; use crate::error::{ArrowError, Result}; use crate::io::ipc::{IpcField, IpcSchema}; @@ -87,7 +87,7 @@ pub fn read_record_batch( version: MetadataVersion, reader: &mut R, block_offset: u64, -) -> Result>> { +) -> Result>> { assert_eq!(fields.len(), ipc_schema.fields.len()); let buffers = batch.buffers().ok_or_else(|| { ArrowError::OutOfSpec("Unable to get buffers from IPC RecordBatch".to_string()) @@ -144,7 +144,7 @@ pub fn read_record_batch( }) .collect::>>()? }; - Columns::try_new(columns) + Chunk::try_new(columns) } fn find_first_dict_field_d<'a>( diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index ba2c9274137..d6b431f4573 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -6,7 +6,7 @@ use arrow_format::ipc::flatbuffers::VerifierOptions; use arrow_format::ipc::File::Block; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; use crate::io::ipc::IpcSchema; @@ -201,7 +201,7 @@ pub fn read_batch( projection: Option<&[usize]>, block: usize, block_data: &mut Vec, -) -> Result>> { +) -> Result>> { let block = metadata.blocks[block]; // read length @@ -286,7 +286,7 @@ impl FileReader { } impl Iterator for FileReader { - type Item = Result>>; + type Item = Result>>; fn next(&mut self) -> Option { // get current block diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index 17e70d89382..f6d761fcc53 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -5,7 +5,7 @@ use arrow_format::ipc; use arrow_format::ipc::Schema::MetadataVersion; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; use crate::io::ipc::IpcSchema; @@ -72,7 +72,7 @@ pub enum StreamState { /// A live stream without data Waiting, /// Next item in the stream - Some(Columns>), + Some(Chunk>), } impl StreamState { @@ -81,7 +81,7 @@ impl StreamState { /// # Panics /// /// If the `StreamState` was `Waiting`. - pub fn unwrap(self) -> Columns> { + pub fn unwrap(self) -> Chunk> { if let StreamState::Some(batch) = self { batch } else { diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index d2d56796140..54af2ec1f7a 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -5,7 +5,7 @@ use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use arrow_format::ipc::Message::CompressionType; use crate::array::*; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::io::ipc::endianess::is_native_little_endian; @@ -172,8 +172,8 @@ fn encode_dictionary( } } -pub fn encode_columns( - columns: &Columns>, +pub fn encode_chunk( + columns: &Chunk>, fields: &[IpcField], dictionary_tracker: &mut DictionaryTracker, options: &WriteOptions, @@ -195,9 +195,9 @@ pub fn encode_columns( Ok((encoded_dictionaries, encoded_message)) } -/// Write [`Columns`] into two sets of bytes, one for the header (ipc::Schema::Message) and the +/// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the batch's data -fn columns_to_bytes(columns: &Columns>, options: &WriteOptions) -> EncodedData { +fn columns_to_bytes(columns: &Chunk>, options: &WriteOptions) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 6c76d69bbcd..da145d7f1c6 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -7,12 +7,12 @@ use std::io::Write; use std::sync::Arc; use super::super::IpcField; -use super::common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions}; +use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}; use super::common_sync::{write_continuation, write_message}; use super::schema_to_bytes; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{ArrowError, Result}; @@ -54,8 +54,8 @@ impl StreamWriter { Ok(()) } - /// Writes [`Columns`] to the stream - pub fn write(&mut self, columns: &Columns>, fields: &[IpcField]) -> Result<()> { + /// Writes [`Chunk`] to the stream + pub fn write(&mut self, columns: &Chunk>, fields: &[IpcField]) -> Result<()> { if self.finished { return Err(ArrowError::Io(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, @@ -63,7 +63,7 @@ impl StreamWriter { ))); } - let (encoded_dictionaries, encoded_message) = encode_columns( + let (encoded_dictionaries, encoded_message) = encode_chunk( columns, fields, &mut self.dictionary_tracker, diff --git a/src/io/ipc/write/stream_async.rs b/src/io/ipc/write/stream_async.rs index 1606a11f268..fa665fa7c4b 100644 --- a/src/io/ipc/write/stream_async.rs +++ b/src/io/ipc/write/stream_async.rs @@ -5,12 +5,12 @@ use futures::AsyncWrite; use super::super::IpcField; pub use super::common::WriteOptions; -use super::common::{encode_columns, DictionaryTracker, EncodedData}; +use super::common::{encode_chunk, DictionaryTracker, EncodedData}; use super::common_async::{write_continuation, write_message}; use super::{default_ipc_fields, schema_to_bytes}; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{ArrowError, Result}; @@ -55,10 +55,10 @@ impl StreamWriter { Ok(()) } - /// Writes [`Columns`] to the stream + /// Writes [`Chunk`] to the stream pub async fn write( &mut self, - columns: &Columns>, + columns: &Chunk>, schema: &Schema, ipc_fields: Option<&[IpcField]>, ) -> Result<()> { @@ -70,7 +70,7 @@ impl StreamWriter { } let (encoded_dictionaries, encoded_message) = if let Some(ipc_fields) = ipc_fields { - encode_columns( + encode_chunk( columns, ipc_fields, &mut self.dictionary_tracker, @@ -78,7 +78,7 @@ impl StreamWriter { )? } else { let ipc_fields = default_ipc_fields(schema.fields()); - encode_columns( + encode_chunk( columns, &ipc_fields, &mut self.dictionary_tracker, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index b5fe5d94232..9f818b5f7f1 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -6,13 +6,13 @@ use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use super::{ super::IpcField, super::ARROW_MAGIC, - common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions}, + common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}, common_sync::{write_continuation, write_message}, default_ipc_fields, schema, schema_to_bytes, }; use crate::array::Array; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{ArrowError, Result}; @@ -79,10 +79,10 @@ impl FileWriter { self.writer } - /// Writes [`Columns`] to the file + /// Writes [`Chunk`] to the file pub fn write( &mut self, - columns: &Columns>, + columns: &Chunk>, ipc_fields: Option<&[IpcField]>, ) -> Result<()> { if self.finished { @@ -98,7 +98,7 @@ impl FileWriter { self.ipc_fields.as_ref() }; - let (encoded_dictionaries, encoded_message) = encode_columns( + let (encoded_dictionaries, encoded_message) = encode_chunk( columns, ipc_fields, &mut self.dictionary_tracker, diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index 322bddf4763..d08b520fe80 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -10,7 +10,7 @@ use serde_json::Value; use crate::{ array::*, bitmap::MutableBitmap, - columns::Columns, + chunk::Chunk, datatypes::{DataType, Field, IntervalUnit}, error::ArrowError, types::NativeType, @@ -251,12 +251,12 @@ fn _deserialize>(rows: &[A], data_type: DataType) -> Arc>( rows: &[A], fields: &[Field], -) -> Result>, ArrowError> { +) -> Result>, ArrowError> { let data_type = DataType::Struct(fields.to_vec()); // convert rows to `Value` @@ -269,5 +269,5 @@ pub fn deserialize>( .collect::, ArrowError>>()?; let (_, columns, _) = deserialize_struct(&rows, data_type).into_data(); - Ok(Columns::new(columns)) + Ok(Chunk::new(columns)) } diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index fa4c7f8c683..fcf1c435148 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -7,7 +7,7 @@ pub use serialize::serialize; use crate::{ array::Array, - columns::Columns, + chunk::Chunk, error::{ArrowError, Result}, }; @@ -30,13 +30,13 @@ where Ok(()) } -/// [`FallibleStreamingIterator`] that serializes a [`Columns`] to bytes. +/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] to bytes. /// Advancing it is CPU-bounded pub struct Serializer where F: JsonFormat, A: AsRef, - I: Iterator>>, + I: Iterator>>, { batches: I, names: Vec, @@ -48,7 +48,7 @@ impl Serializer where F: JsonFormat, A: AsRef, - I: Iterator>>, + I: Iterator>>, { /// Creates a new [`Serializer`]. pub fn new(batches: I, names: Vec, buffer: Vec, format: F) -> Self { @@ -65,7 +65,7 @@ impl FallibleStreamingIterator for Serializer where F: JsonFormat, A: AsRef, - I: Iterator>>, + I: Iterator>>, { type Item = [u8]; @@ -75,8 +75,8 @@ where self.buffer.clear(); self.batches .next() - .map(|maybe_columns| { - maybe_columns + .map(|maybe_chunk| { + maybe_chunk .map(|columns| serialize(&self.names, &columns, self.format, &mut self.buffer)) }) .transpose()?; diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 1c1d28b4267..39af9e8871b 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -3,7 +3,7 @@ use serde_json::Value; use streaming_iterator::StreamingIterator; use crate::bitmap::utils::zip_validity; -use crate::columns::Columns; +use crate::chunk::Chunk; use crate::io::iterator::BufStreamingIterator; use crate::util::lexical_to_bytes_mut; use crate::{array::*, datatypes::DataType, types::NativeType}; @@ -83,7 +83,7 @@ fn struct_serializer<'a>( serializers .iter_mut() .zip(names) - // `unwrap` is infalible because `array.len()` equals `len` on `Columns` + // `unwrap` is infalible because `array.len()` equals `len` on `Chunk` .for_each(|(iter, name)| { let item = iter.next().unwrap(); record.push((name, item)); @@ -199,7 +199,7 @@ fn serialize_item( /// Serializes a (name, array) to a valid JSON to `buffer` /// This is CPU-bounded -pub fn serialize(names: &[N], columns: &Columns, format: F, buffer: &mut Vec) +pub fn serialize(names: &[N], columns: &Chunk, format: F, buffer: &mut Vec) where N: AsRef, A: AsRef, @@ -219,7 +219,7 @@ where serializers .iter_mut() .zip(names.iter()) - // `unwrap` is infalible because `array.len()` equals `len` on `Columns` + // `unwrap` is infalible because `array.len()` equals `len` on `Chunk` .for_each(|(iter, name)| { let item = iter.next().unwrap(); record.push((name.as_ref(), item)); diff --git a/src/io/json_integration/read/array.rs b/src/io/json_integration/read/array.rs index a6913139b39..78dcd863823 100644 --- a/src/io/json_integration/read/array.rs +++ b/src/io/json_integration/read/array.rs @@ -7,7 +7,7 @@ use crate::{ array::*, bitmap::{Bitmap, MutableBitmap}, buffer::Buffer, - columns::Columns, + chunk::Chunk, datatypes::{DataType, PhysicalType, PrimitiveType, Schema}, error::{ArrowError, Result}, io::ipc::IpcField, @@ -410,12 +410,12 @@ pub fn to_array( } } -pub fn deserialize_columns( +pub fn deserialize_chunk( schema: &Schema, ipc_fields: &[IpcField], json_batch: &ArrowJsonBatch, json_dictionaries: &HashMap, -) -> Result>> { +) -> Result>> { let arrays = schema .fields() .iter() @@ -431,5 +431,5 @@ pub fn deserialize_columns( }) .collect::>()?; - Columns::try_new(arrays) + Chunk::try_new(arrays) } diff --git a/src/io/json_integration/write/array.rs b/src/io/json_integration/write/array.rs index ebca2058ac8..0d317b9a7e4 100644 --- a/src/io/json_integration/write/array.rs +++ b/src/io/json_integration/write/array.rs @@ -2,15 +2,15 @@ use std::sync::Arc; use crate::{ array::{Array, PrimitiveArray}, - columns::Columns, + chunk::Chunk, datatypes::DataType, }; use super::super::{ArrowJsonBatch, ArrowJsonColumn}; -/// Serializes a [`Columns`] to [`ArrowJsonBatch`]. -pub fn serialize_columns( - columns: &Columns>, +/// Serializes a [`Chunk`] to [`ArrowJsonBatch`]. +pub fn serialize_chunk( + columns: &Chunk>, names: &[A], ) -> ArrowJsonBatch { let count = columns.len(); diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index dd7eb3035e3..414ab1c2c84 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -224,17 +224,17 @@ fn column_datatype(data_type: &DataType, column: usize) -> DataType { | LargeUtf8 | Dictionary(_) | List | LargeList | FixedSizeList => data_type.clone(), Struct => { if let DataType::Struct(fields) = data_type.to_logical_type() { - let mut total_columns = 0; + let mut total_chunk = 0; let mut total_fields = 0; for f in fields { - let field_columns = column_offset(f.data_type()); - if column < total_columns + field_columns { - return column_datatype(f.data_type(), column + total_columns); + let field_chunk = column_offset(f.data_type()); + if column < total_chunk + field_chunk { + return column_datatype(f.data_type(), column + total_chunk); } - total_fields += (field_columns > 0) as usize; - total_columns += field_columns; + total_fields += (field_chunk > 0) as usize; + total_chunk += field_chunk; } - fields[column + total_fields - total_columns] + fields[column + total_fields - total_chunk] .data_type() .clone() } else { diff --git a/src/io/parquet/read/record_batch.rs b/src/io/parquet/read/record_batch.rs index a548221e57f..f42f380592e 100644 --- a/src/io/parquet/read/record_batch.rs +++ b/src/io/parquet/read/record_batch.rs @@ -5,7 +5,7 @@ use std::{ use crate::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::{Field, Schema}, error::{ArrowError, Result}, }; @@ -108,7 +108,7 @@ impl RecordReader { } impl Iterator for RecordReader { - type Item = Result>>; + type Item = Result>>; fn next(&mut self) -> Option { if self.schema.fields().is_empty() { @@ -170,7 +170,7 @@ impl Iterator for RecordReader { self.buffer = b1; self.decompress_buffer = b2; self.remaining_rows -= columns[0].len(); - Columns::new(columns) + Chunk::new(columns) })) } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index fc436faff0b..3fa34b4e3b9 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -7,23 +7,23 @@ use super::{ }; use crate::{ array::Array, - columns::Columns, + chunk::Chunk, datatypes::Schema, error::{ArrowError, Result}, }; -/// An iterator adapter that converts an iterator over [`Columns`] into an iterator +/// An iterator adapter that converts an iterator over [`Chunk`] into an iterator /// of row groups. /// Use it to create an iterator consumable by the parquet's API. -pub struct RowGroupIterator + 'static, I: Iterator>>> { +pub struct RowGroupIterator + 'static, I: Iterator>>> { iter: I, options: WriteOptions, parquet_schema: SchemaDescriptor, encodings: Vec, } -impl + 'static, I: Iterator>>> RowGroupIterator { - /// Creates a new [`RowGroupIterator`] from an iterator over [`Columns`]. +impl + 'static, I: Iterator>>> RowGroupIterator { + /// Creates a new [`RowGroupIterator`] from an iterator over [`Chunk`]. pub fn try_new( iter: I, schema: &Schema, @@ -48,7 +48,7 @@ impl + 'static, I: Iterator>>> RowG } } -impl + 'static, I: Iterator>>> Iterator +impl + 'static, I: Iterator>>> Iterator for RowGroupIterator { type Item = Result>; @@ -56,8 +56,8 @@ impl + 'static, I: Iterator>>> Iter fn next(&mut self) -> Option { let options = self.options; - self.iter.next().map(|maybe_columns| { - let columns = maybe_columns?; + self.iter.next().map(|maybe_chunk| { + let columns = maybe_chunk?; let encodings = self.encodings.clone(); Ok(DynIter::new( columns diff --git a/src/io/print.rs b/src/io/print.rs index c673d799355..1f5e8c1b9c3 100644 --- a/src/io/print.rs +++ b/src/io/print.rs @@ -1,14 +1,14 @@ -//! APIs to represent [`Columns`] as a formatted table. +//! APIs to represent [`Chunk`] as a formatted table. use crate::{ array::{get_display, Array}, - columns::Columns, + chunk::Chunk, }; use comfy_table::{Cell, Table}; -/// Returns a visual representation of [`Columns`] -pub fn write, N: AsRef>(batches: &[Columns], names: &[N]) -> String { +/// Returns a visual representation of [`Chunk`] +pub fn write, N: AsRef>(batches: &[Chunk], names: &[N]) -> String { let mut table = Table::new(); table.load_preset("||--+-++| ++++++"); diff --git a/src/lib.rs b/src/lib.rs index de2ed0cd1d2..608bef98120 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ pub mod array; pub mod bitmap; pub mod buffer; -pub mod columns; +pub mod chunk; pub mod error; pub mod scalar; pub mod trusted_len; diff --git a/testing/arrow-testing b/testing/arrow-testing index 83ada4ec0f2..e8ce32338f2 160000 --- a/testing/arrow-testing +++ b/testing/arrow-testing @@ -1 +1 @@ -Subproject commit 83ada4ec0f2cfe36f4168628d7f470e6199e663a +Subproject commit e8ce32338f2dfeca3a5126f7677bdee159604000 diff --git a/testing/parquet-testing b/testing/parquet-testing index d4d485956a6..8e7badc6a38 160000 --- a/testing/parquet-testing +++ b/testing/parquet-testing @@ -1 +1 @@ -Subproject commit d4d485956a643c693b5549e1a62d52ca61c170f1 +Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890 diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 4bd43676d3b..bd22bbe153c 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use avro_rs::types::{Record, Value}; use avro_rs::{Codec, Writer}; use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; @@ -69,7 +69,7 @@ pub(super) fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -pub(super) fn data() -> Columns> { +pub(super) fn data() -> Chunk> { let data = vec![ Some(vec![Some(1i32), None, Some(3)]), Some(vec![Some(1i32), None, Some(3)]), @@ -94,7 +94,7 @@ pub(super) fn data() -> Columns> { )), ]; - Columns::try_new(columns).unwrap() + Chunk::try_new(columns).unwrap() } pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs::Error> { @@ -149,7 +149,7 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Ok(writer.into_inner().unwrap()) } -pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Columns>, Schema)> { +pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Chunk>, Schema)> { let file = &mut avro; let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index a597ae8b900..01c33bbd77c 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::avro::write; @@ -19,7 +19,7 @@ fn schema() -> Schema { ]) } -fn data() -> Columns> { +fn data() -> Chunk> { let columns = vec![ Arc::new(Int64Array::from_slice([27, 47])) as Arc, Arc::new(Utf8Array::::from_slice(["foo", "bar"])) as Arc, @@ -31,13 +31,13 @@ fn data() -> Columns> { Arc::new(Utf8Array::::from([Some("foo"), None])) as Arc, ]; - Columns::try_new(columns).unwrap() + Chunk::try_new(columns).unwrap() } use super::read::read_avro; fn write_avro>( - columns: &Columns, + columns: &Chunk, schema: &Schema, compression: Option, ) -> Result> { diff --git a/tests/it/io/csv/write.rs b/tests/it/io/csv/write.rs index c59ceadf400..3b86ed2a60a 100644 --- a/tests/it/io/csv/write.rs +++ b/tests/it/io/csv/write.rs @@ -2,12 +2,12 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::csv::write::*; -fn data() -> Columns> { +fn data() -> Chunk> { let c1 = Utf8Array::::from_slice(["a b", "c", "d"]); let c2 = Float64Array::from([Some(123.564532), None, Some(-556132.25)]); let c3 = UInt32Array::from_slice(&[3, 2, 1]); @@ -19,7 +19,7 @@ fn data() -> Columns> { let keys = UInt32Array::from_slice(&[2, 0, 1]); let c7 = DictionaryArray::from_data(keys, Arc::new(c1.clone())); - Columns::new(vec![ + Chunk::new(vec![ Box::new(c1) as Box, Box::new(c2), Box::new(c3), @@ -39,7 +39,7 @@ fn write_csv() -> Result<()> { write_header(&mut writer, &["c1", "c2", "c3", "c4", "c5", "c6", "c7"])?; let options = SerializeOptions::default(); - write_columns(&mut writer, &columns, &options)?; + write_chunk(&mut writer, &columns, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); @@ -67,7 +67,7 @@ fn write_csv_custom_options() -> Result<()> { time64_format: Some("%r".to_string()), ..Default::default() }; - write_columns(&mut writer, &batch, &options)?; + write_chunk(&mut writer, &batch, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); @@ -82,7 +82,7 @@ d|-556132.25|1||2019-04-18 02:45:55.555|11:46:03 PM|c Ok(()) } -fn data_array(column: usize) -> (Columns>, Vec<&'static str>) { +fn data_array(column: usize) -> (Chunk>, Vec<&'static str>) { let (array, expected) = match column { 0 => ( Arc::new(Utf8Array::::from_slice(["a b", "c", "d"])) as Arc, @@ -207,7 +207,7 @@ fn data_array(column: usize) -> (Columns>, Vec<&'static str>) { _ => todo!(), }; - (Columns::new(vec![array]), expected) + (Chunk::new(vec![array]), expected) } fn write_single(column: usize) -> Result<()> { @@ -218,7 +218,7 @@ fn write_single(column: usize) -> Result<()> { write_header(&mut writer, &["c1"])?; let options = SerializeOptions::default(); - write_columns(&mut writer, &columns, &options)?; + write_chunk(&mut writer, &columns, &options)?; // check let buffer = writer.into_inner().unwrap().into_inner(); diff --git a/tests/it/io/ipc/common.rs b/tests/it/io/ipc/common.rs index 4b73b24f5eb..0f5270d76e1 100644 --- a/tests/it/io/ipc/common.rs +++ b/tests/it/io/ipc/common.rs @@ -1,14 +1,14 @@ use std::{collections::HashMap, fs::File, io::Read, sync::Arc}; use arrow2::{ - array::Array, columns::Columns, datatypes::Schema, error::Result, + array::Array, chunk::Chunk, datatypes::Schema, error::Result, io::ipc::read::read_stream_metadata, io::ipc::read::StreamReader, io::ipc::IpcField, io::json_integration::read, io::json_integration::ArrowJson, }; use flate2::read::GzDecoder; -type IpcRead = (Schema, Vec, Vec>>); +type IpcRead = (Schema, Vec, Vec>>); /// Read gzipped JSON file pub fn read_gzip_json(version: &str, file_name: &str) -> Result { @@ -40,7 +40,7 @@ pub fn read_gzip_json(version: &str, file_name: &str) -> Result { let batches = arrow_json .batches .iter() - .map(|batch| read::deserialize_columns(&schema, &ipc_fields, batch, &dictionaries)) + .map(|batch| read::deserialize_chunk(&schema, &ipc_fields, batch, &dictionaries)) .collect::>>()?; Ok((schema, ipc_fields, batches)) @@ -57,7 +57,7 @@ pub fn read_arrow_stream(version: &str, file_name: &str) -> IpcRead { let metadata = read_stream_metadata(&mut file).unwrap(); let reader = StreamReader::new(file, metadata); - let schema = reader.metadata().schema.as_ref().clone(); + let schema = reader.metadata().schema.clone(); let ipc_fields = reader.metadata().ipc_schema.fields.clone(); ( diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 8156b834a59..2df0cda955b 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -2,7 +2,7 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::{Field, Schema}; use arrow2::error::Result; use arrow2::io::ipc::read::{read_file_metadata, FileReader}; @@ -11,7 +11,7 @@ use arrow2::io::ipc::{write::*, IpcField}; use crate::io::ipc::common::read_gzip_json; fn write_( - batches: &[Columns>], + batches: &[Chunk>], schema: &Schema, ipc_fields: Option>, compression: Option, @@ -27,7 +27,7 @@ fn write_( } fn round_trip( - columns: Columns>, + columns: Chunk>, schema: Schema, ipc_fields: Option>, ) -> Result<()> { @@ -41,11 +41,11 @@ fn round_trip( )?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; - let schema = metadata.schema().clone(); + let schema = metadata.schema.clone(); let reader = FileReader::new(reader, metadata, None); - assert_eq!(schema.as_ref(), &expected_schema); + assert_eq!(schema, expected_schema); let batches = reader.collect::>>()?; @@ -65,7 +65,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { let result = write_(&batches, &schema, Some(ipc_fields), compression)?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; - let schema = metadata.schema().clone(); + let schema = metadata.schema.clone(); let ipc_fields = metadata.ipc_schema.fields.clone(); let reader = FileReader::new(reader, metadata, None); @@ -74,7 +74,7 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { let (expected_schema, expected_ipc_fields, expected_batches) = read_gzip_json(version, file_name)?; - assert_eq!(schema.as_ref(), &expected_schema); + assert_eq!(schema, expected_schema); assert_eq!(ipc_fields, expected_ipc_fields); let batches = reader.collect::>>()?; @@ -339,7 +339,7 @@ fn write_boolean() -> Result<()> { Some(true), ])) as Arc; let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); - let columns = Columns::try_new(vec![array])?; + let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None) } @@ -349,7 +349,7 @@ fn write_sliced_utf8() -> Result<()> { use std::sync::Arc; let array = Arc::new(Utf8Array::::from_slice(["aa", "bb"]).slice(1, 1)) as Arc; let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); - let columns = Columns::try_new(vec![array])?; + let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None) } @@ -367,6 +367,6 @@ fn write_sliced_list() -> Result<()> { let array: Arc = array.into_arc().slice(1, 2).into(); let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), true)]); - let columns = Columns::try_new(vec![array])?; + let columns = Chunk::try_new(vec![array])?; round_trip(columns, schema, None) } diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index db13c2b734a..8a20ef70b2e 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -2,7 +2,7 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read::read_stream_metadata; @@ -13,11 +13,7 @@ use arrow2::io::ipc::IpcField; use crate::io::ipc::common::read_arrow_stream; use crate::io::ipc::common::read_gzip_json; -fn write_( - schema: &Schema, - ipc_fields: &[IpcField], - batches: &[Columns>], -) -> Vec { +fn write_(schema: &Schema, ipc_fields: &[IpcField], batches: &[Chunk>]) -> Vec { let mut result = vec![]; let options = WriteOptions { compression: None }; @@ -46,7 +42,7 @@ fn test_file(version: &str, file_name: &str) { let (expected_schema, expected_ipc_fields, expected_batches) = read_gzip_json(version, file_name).unwrap(); - assert_eq!(schema.as_ref(), &expected_schema); + assert_eq!(schema, expected_schema); assert_eq!(ipc_fields, expected_ipc_fields); let batches = reader diff --git a/tests/it/io/ipc/write_async.rs b/tests/it/io/ipc/write_async.rs index 6f803bb4667..9f27eea8808 100644 --- a/tests/it/io/ipc/write_async.rs +++ b/tests/it/io/ipc/write_async.rs @@ -2,7 +2,7 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::Array; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::ipc::read; @@ -16,7 +16,7 @@ use crate::io::ipc::common::read_gzip_json; async fn write_( schema: &Schema, ipc_fields: &[IpcField], - batches: &[Columns>], + batches: &[Chunk>], ) -> Result> { let mut result = AsyncCursor::new(vec![]); @@ -39,7 +39,7 @@ async fn test_file(version: &str, file_name: &str) -> Result<()> { let metadata = read::read_stream_metadata(&mut reader)?; let reader = read::StreamReader::new(reader, metadata); - let schema = reader.metadata().schema.as_ref(); + let schema = &reader.metadata().schema; let ipc_fields = reader.metadata().ipc_schema.fields.clone(); // read expected JSON output diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index 636533ac1b7..661bfb0d3e1 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -5,13 +5,13 @@ use std::io::Cursor; use std::sync::Arc; use arrow2::array::*; -use arrow2::columns::Columns; +use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; use arrow2::io::json::read as json_read; use arrow2::io::json::write as json_write; -fn read_batch(data: String, fields: &[Field]) -> Result>> { +fn read_batch(data: String, fields: &[Field]) -> Result>> { let mut reader = Cursor::new(data); let mut rows = vec![String::default(); 1024]; @@ -21,7 +21,7 @@ fn read_batch(data: String, fields: &[Field]) -> Result>> } fn write_batch>( - batch: Columns, + batch: Chunk, names: Vec, format: F, ) -> Result> { @@ -47,9 +47,9 @@ fn round_trip(data: String) -> Result<()> { json_write::LineDelimited::default(), )?; - let new_columns = read_batch(String::from_utf8(buf).unwrap(), &fields)?; + let new_chunk = read_batch(String::from_utf8(buf).unwrap(), &fields)?; - assert_eq!(columns, new_columns); + assert_eq!(columns, new_chunk); Ok(()) } diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index bd0d2337b2c..51997c1d367 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -15,7 +15,7 @@ fn write_simple_rows() -> Result<()> { let a = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); - let batch = Columns::try_new(vec![&a as &dyn Array, &b]).unwrap(); + let batch = Chunk::try_new(vec![&a as &dyn Array, &b]).unwrap(); let buf = write_batch( batch, @@ -37,15 +37,10 @@ fn write_simple_rows() -> Result<()> { #[test] fn write_simple_rows_array() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("c1", DataType::Int32, false), - Field::new("c2", DataType::Utf8, false), - ]); - let a = Int32Array::from([Some(1), Some(2), Some(3), None, Some(5)]); let b = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c"), Some("d"), None]); - let batch = Columns::try_new(vec![&a as &dyn Array, &b]).unwrap(); + let batch = Chunk::try_new(vec![&a as &dyn Array, &b]).unwrap(); let buf = write_batch( batch, @@ -88,7 +83,7 @@ fn write_nested_struct_with_validity() -> Result<()> { ); let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); let buf = write_batch( batch, @@ -133,7 +128,7 @@ fn write_nested_structs() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("a"), Some("b"), Some("c")]); - let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); let buf = write_batch( batch, @@ -169,7 +164,7 @@ fn write_struct_with_list_field() -> Result<()> { let b = PrimitiveArray::from_slice([1, 2, 3, 4, 5]); - let batch = Columns::try_new(vec![&a as &dyn Array, &b]).unwrap(); + let batch = Chunk::try_new(vec![&a as &dyn Array, &b]).unwrap(); let buf = write_batch( batch, @@ -213,7 +208,7 @@ fn write_nested_list() -> Result<()> { let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); - let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); let buf = write_batch( batch, @@ -274,7 +269,7 @@ fn write_list_of_struct() -> Result<()> { let c2 = Int32Array::from_slice(&[1, 2, 3]); - let batch = Columns::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); + let batch = Chunk::try_new(vec![&c1 as &dyn Array, &c2]).unwrap(); let buf = write_batch( batch, @@ -296,7 +291,7 @@ fn write_list_of_struct() -> Result<()> { fn write_escaped_utf8() -> Result<()> { let a = Utf8Array::::from(&vec![Some("a\na"), None]); - let batch = Columns::try_new(vec![&a as &dyn Array]).unwrap(); + let batch = Chunk::try_new(vec![&a as &dyn Array]).unwrap(); let buf = write_batch( batch, diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index dcc7ca368d6..4fc1a9a4781 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use arrow2::error::ArrowError; use arrow2::{ - array::*, bitmap::Bitmap, buffer::Buffer, columns::Columns, datatypes::*, error::Result, + array::*, bitmap::Bitmap, buffer::Buffer, chunk::Chunk, datatypes::*, error::Result, io::parquet::read::statistics::*, io::parquet::read::*, io::parquet::write::*, }; @@ -620,7 +620,7 @@ pub fn pyarrow_struct_statistics(column: usize) -> Option> { } /// Round-trip with parquet using the same integration files used for IPC integration tests. -fn integration_write(schema: &Schema, batches: &[Columns>]) -> Result> { +fn integration_write(schema: &Schema, batches: &[Chunk>]) -> Result> { let options = WriteOptions { write_statistics: true, compression: Compression::Uncompressed, @@ -667,7 +667,7 @@ fn integration_write(schema: &Schema, batches: &[Columns>]) -> Re Ok(writer.into_inner()) } -type IntegrationRead = (Arc, Vec>>); +type IntegrationRead = (Arc, Vec>>); fn integration_read(data: &[u8]) -> Result { let reader = Cursor::new(data); @@ -720,7 +720,7 @@ fn arrow_type() -> Result<()> { Field::new("a1", dt1, true), Field::new("a2", array2.data_type().clone(), true), ]); - let batch = Columns::try_new(vec![Arc::new(array) as Arc, Arc::new(array2)])?; + let batch = Chunk::try_new(vec![Arc::new(array) as Arc, Arc::new(array2)])?; let r = integration_write(&schema, &[batch.clone()])?; diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index a7733a1860a..a6da48c41b9 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -42,7 +42,7 @@ fn round_trip( let parquet_schema = to_parquet_schema(&schema)?; - let iter = vec![Columns::try_new(vec![array.clone()])]; + let iter = vec![Chunk::try_new(vec![array.clone()])]; let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, vec![encoding])?; diff --git a/tests/it/io/print.rs b/tests/it/io/print.rs index 5497e112915..68eccdcc32b 100644 --- a/tests/it/io/print.rs +++ b/tests/it/io/print.rs @@ -4,7 +4,7 @@ use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, - columns::Columns, + chunk::Chunk, datatypes::{DataType, Field, TimeUnit, UnionMode}, error::Result, io::print::*, @@ -15,7 +15,7 @@ fn write_basics() -> Result<()> { let a = Utf8Array::::from(vec![Some("a"), Some("b"), None, Some("d")]); let b = Int32Array::from(vec![Some(1), None, Some(10), Some(100)]); - let batch = Columns::try_new(vec![&a as &dyn Array, &b])?; + let batch = Chunk::try_new(vec![&a as &dyn Array, &b])?; let table = write(&[batch], &["a".to_string(), "b".to_string()]); @@ -46,7 +46,7 @@ fn write_null() -> Result<()> { .collect(); // define data (null) - let columns = Columns::try_new(arrays)?; + let columns = Chunk::try_new(arrays)?; let table = write(&[columns], &["a", "b", "c"]); @@ -74,7 +74,7 @@ fn write_dictionary() -> Result<()> { array.try_extend(vec![Some("one"), None, Some("three")])?; let array = array.into_box(); - let batch = Columns::new(vec![array]); + let batch = Chunk::new(vec![array]); let table = write(&[batch], &["d1"]); @@ -101,7 +101,7 @@ fn dictionary_validities() -> Result<()> { let values = PrimitiveArray::::from([None, Some(10)]); let array = DictionaryArray::::from_data(keys, Arc::new(values)); - let columns = Columns::new(vec![&array as &dyn Array]); + let columns = Chunk::new(vec![&array as &dyn Array]); let table = write(&[columns], &["d1"]); @@ -122,7 +122,7 @@ fn dictionary_validities() -> Result<()> { macro_rules! check_datetime { ($ty:ty, $datatype:expr, $value:expr, $EXPECTED_RESULT:expr) => { let array = PrimitiveArray::<$ty>::from(&[Some($value), None]).to($datatype); - let batch = Columns::new(vec![&array as &dyn Array]); + let batch = Chunk::new(vec![&array as &dyn Array]); let table = write(&[batch], &["f"]); @@ -331,7 +331,7 @@ fn write_struct() -> Result<()> { let array = StructArray::from_data(DataType::Struct(fields), values, validity); - let columns = Columns::new(vec![&array as &dyn Array]); + let columns = Chunk::new(vec![&array as &dyn Array]); let table = write(&[columns], &["a"]); @@ -367,7 +367,7 @@ fn write_union() -> Result<()> { let array = UnionArray::from_data(data_type, types, fields, None); - let batch = Columns::new(vec![&array as &dyn Array]); + let batch = Chunk::new(vec![&array as &dyn Array]); let table = write(&[batch], &["a"]);