Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Renamed Columns -> Chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 31, 2021
1 parent ebfee31 commit 8cffcb1
Show file tree
Hide file tree
Showing 73 changed files with 331 additions and 335 deletions.
8 changes: 4 additions & 4 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashMap, io::Read};
use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
columns::Columns,
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
io::{
Expand All @@ -25,7 +25,7 @@ use flate2::read::GzDecoder;
pub fn read_gzip_json(
version: &str,
file_name: &str,
) -> Result<(Schema, Vec<IpcField>, Vec<Columns<Arc<dyn Array>>>)> {
) -> Result<(Schema, Vec<IpcField>, Vec<Chunk<Arc<dyn Array>>>)> {
let path = format!(
"../testing/arrow-testing/data/arrow-ipc-stream/integration/{}/{}.json.gz",
version, file_name
Expand Down Expand Up @@ -53,7 +53,7 @@ pub fn read_gzip_json(
let batches = arrow_json
.batches
.iter()
.map(|batch| read::deserialize_columns(&schema, &ipc_fields, batch, &dictionaries))
.map(|batch| read::deserialize_chunk(&schema, &ipc_fields, batch, &dictionaries))
.collect::<Result<Vec<_>>>()?;

Ok((schema, ipc_fields, batches))
Expand Down Expand Up @@ -148,7 +148,7 @@ fn main() -> Result<()> {
}
})
.collect();
Columns::try_new(columns).unwrap()
Chunk::try_new(columns).unwrap()
})
.collect::<Vec<_>>()
} else {
Expand Down
6 changes: 3 additions & 3 deletions benches/filter_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::compute::filter::{build_filter, filter, filter_columns, Filter};
use arrow2::chunk::Chunk;
use arrow2::compute::filter::{build_filter, filter, filter_chunk, Filter};
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

Expand Down Expand Up @@ -125,7 +125,7 @@ fn add_benchmark(c: &mut Criterion) {

let data_array = create_primitive_array::<f32>(size, 0.0);

let columns = Columns::try_new(vec![Arc::new(data_array)]).unwrap();
let columns = Chunk::try_new(vec![Arc::new(data_array)]).unwrap();

c.bench_function("filter single record batch", |b| {
b.iter(|| filter_record_batch(&columns, &filter_array))
Expand Down
4 changes: 2 additions & 2 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {

let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?;

for maybe_columns in reader {
let columns = maybe_columns?;
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert_eq!(columns.len(), size);
}
Ok(())
Expand Down
14 changes: 7 additions & 7 deletions benches/write_csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::write;
use arrow2::util::bench_util::*;

fn write_batch(columns: &Columns) -> Result<()> {
fn write_batch(columns: &Chunk) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_writer(vec![]);

assert_eq!(columns.arrays().len(), 1);
Expand All @@ -18,30 +18,30 @@ fn write_batch(columns: &Columns) -> Result<()> {
write::write_batch(writer, batch, &options)
}

fn make_columns(array: impl Array + 'static) -> Columns<Arc<dyn Array>> {
Columns::new(vec![Arc::new(array)])
fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array)])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let batch = make_columns(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let batch = make_columns(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
});

let array = create_primitive_array::<f64>(size, 0.1);
let batch = make_columns(array);
let batch = make_chunk(array);

c.bench_function(&format!("csv write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&batch))
Expand Down
4 changes: 2 additions & 2 deletions benches/write_ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write::*;
Expand All @@ -13,7 +13,7 @@ use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, cre
fn write(array: &dyn Array) -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);
let columns = Columns::try_new(vec![clone(array).into()])?;
let columns = Chunk::try_new(vec![clone(array).into()])?;

let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema, None, Default::default())?;
Expand Down
14 changes: 7 additions & 7 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::sync::Arc;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::write;
use arrow2::util::bench_util::*;

fn write_batch(columns: &Columns<Arc<dyn Array>>) -> Result<()> {
fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
let mut writer = vec![];
let format = write::JsonArray::default();

Expand All @@ -23,30 +23,30 @@ fn write_batch(columns: &Columns<Arc<dyn Array>>) -> Result<()> {
Ok(())
}

fn make_columns(array: impl Array + 'static) -> Columns<Arc<dyn Array>> {
Columns::new(vec![Arc::new(array) as Arc<dyn Array>])
fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let columns = make_columns(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let columns = make_columns(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
});

let array = create_primitive_array::<f64>(size, 0.1);
let columns = make_columns(array);
let columns = make_chunk(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
Expand Down
4 changes: 2 additions & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::io::Cursor;
use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::{clone, Array};
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::parquet::write::*;
use arrow2::util::bench_util::{create_boolean_array, create_primitive_array, create_string_array};

fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {
let columns = Columns::new(vec![clone(array).into()]);
let columns = Chunk::new(vec![clone(array).into()]);
let schema = batch.schema().clone();

let options = WriteOptions {
Expand Down
4 changes: 2 additions & 2 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ fn main() -> Result<()> {
schema.fields,
);

for maybe_columns in reader {
let columns = maybe_columns?;
for maybe_chunk in reader {
let columns = maybe_chunk?;
assert!(!columns.is_empty());
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/csv_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;

fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Columns<Arc<dyn Array>>> {
fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Arc<dyn Array>>> {
// Create a CSV reader. This is typically created on the thread that reads the file and
// thus owns the read head.
let mut reader = read::ReaderBuilder::new().from_path(path)?;
Expand Down
4 changes: 2 additions & 2 deletions examples/csv_read_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::thread;
use std::time::SystemTime;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::{error::Result, io::csv::read};

fn parallel_read(path: &str) -> Result<Vec<Columns<Arc<dyn Array>>>> {
fn parallel_read(path: &str) -> Result<Vec<Chunk<Arc<dyn Array>>>> {
let batch_size = 100;
let has_header = true;
let projection = None;
Expand Down
8 changes: 4 additions & 4 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use arrow2::{
array::{Array, Int32Array},
columns::Columns,
chunk::Chunk,
error::Result,
io::csv::write,
};

fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Columns<A>]) -> Result<()> {
fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;

write::write_header(writer, &["c1"])?;

let options = write::SerializeOptions::default();
columns
.iter()
.try_for_each(|batch| write::write_columns(writer, batch, &options))
.try_for_each(|batch| write::write_chunk(writer, batch, &options))
}

fn main() -> Result<()> {
Expand All @@ -26,7 +26,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let batch = Columns::try_new(vec![&array as &dyn Array])?;
let batch = Chunk::try_new(vec![&array as &dyn Array])?;

write_batch("example.csv", &[batch])
}
6 changes: 3 additions & 3 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use std::thread;

use arrow2::{
array::{Array, Int32Array},
columns::Columns,
chunk::Chunk,
error::Result,
io::csv::write,
};

fn parallel_write(path: &str, batches: [Columns<Arc<dyn Array>>; 2]) -> Result<()> {
fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()> {
let options = write::SerializeOptions::default();

// write a header
Expand Down Expand Up @@ -60,7 +60,7 @@ fn main() -> Result<()> {
Some(5),
Some(6),
]);
let columns = Columns::new(vec![Arc::new(array) as Arc<dyn Array>]);
let columns = Chunk::new(vec![Arc::new(array) as Arc<dyn Array>]);

parallel_write("example.csv", [columns.clone(), columns])
}
6 changes: 3 additions & 3 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{Cursor, Seek, Write};
use std::sync::Arc;

use arrow2::array::*;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::ipc::read;
Expand Down Expand Up @@ -40,14 +40,14 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;

let batch = Columns::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;
let batch = Chunk::try_new(vec![Arc::new(array) as Arc<dyn Array>])?;

writer.write(&batch, None)?;

Ok(writer.into_inner())
}

fn read_ipc(buf: &[u8]) -> Result<Columns<Arc<dyn Array>>> {
fn read_ipc(buf: &[u8]) -> Result<Chunk<Arc<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
Expand Down
6 changes: 3 additions & 3 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ use std::fs::File;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::datatypes::Schema;
use arrow2::error::Result;
use arrow2::io::ipc::read::{read_file_metadata, FileReader};
use arrow2::io::print;

fn read_batches(path: &str) -> Result<(Schema, Vec<Columns<Arc<dyn Array>>>)> {
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Arc<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
let metadata = read_file_metadata(&mut file)?;

let schema = metadata.schema().as_ref().clone();
let schema = metadata.schema.clone();

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(file, metadata, None);
Expand Down
6 changes: 3 additions & 3 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::fs::File;
use std::sync::Arc;

use arrow2::array::{Array, Int32Array, Utf8Array};
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: &Schema, columns: &[Columns<Arc<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: &Schema, columns: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
Expand All @@ -34,7 +34,7 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Columns::try_new(vec![Arc::new(a) as Arc<dyn Array>, Arc::new(b)])?;
let batch = Chunk::try_new(vec![Arc::new(a) as Arc<dyn Array>, Arc::new(b)])?;

// write it
write_batches(file_path, &schema, &[batch])?;
Expand Down
6 changes: 3 additions & 3 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::columns::Columns;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Columns<Arc<dyn Array>>> {
fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
// Example of reading a JSON file.
let mut reader = BufReader::new(File::open(path)?);

Expand All @@ -31,7 +31,7 @@ fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Columns<Arc<dy
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into `Columns`. This is CPU-intensive, has no IO,
// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
}
Expand Down
Loading

0 comments on commit 8cffcb1

Please sign in to comment.