Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 25, 2022
1 parent d9cccff commit 2805aea
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 20 deletions.
4 changes: 2 additions & 2 deletions examples/csv_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use arrow2::{
};

fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
let writer = &mut write::WriterBuilder::new().from_path(path)?;
let mut writer = std::fs::File::create(path)?;

let options = write::SerializeOptions::default();
write::write_header(writer, &["c1"], &options)?;
write::write_header(&mut writer, &["c1"], &options)?;

columns
.iter()
Expand Down
13 changes: 6 additions & 7 deletions examples/csv_write_parallel.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::Arc;
Expand All @@ -14,8 +15,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = write::SerializeOptions::default();

// write a header
let writer = &mut write::WriterBuilder::new().from_path(path)?;
write::write_header(writer, &["c1"])?;
let mut writer = std::fs::File::create(path)?;
write::write_header(&mut writer, &["c1"], &options)?;

// prepare a channel to send serialized records from threads
let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
Expand All @@ -28,8 +29,8 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
let options = options.clone();
let batch = batches[id].clone(); // note: this is cheap
let child = thread::spawn(move || {
let records = write::serialize(&batch, &options).unwrap();
thread_tx.send(records).unwrap();
let rows = write::serialize(&batch, &options).unwrap();
thread_tx.send(rows).unwrap();
});

children.push(child);
Expand All @@ -38,9 +39,7 @@ fn parallel_write(path: &str, batches: [Chunk<Arc<dyn Array>>; 2]) -> Result<()>
for _ in 0..2 {
// block: assumes that the order of batches matter.
let records = rx.recv().unwrap();
records
.iter()
.try_for_each(|record| writer.write_byte_record(record))?
records.iter().try_for_each(|row| writer.write_all(&row))?
}

for child in children {
Expand Down
33 changes: 23 additions & 10 deletions src/io/csv/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,37 @@ fn new_serializers<'a, A: AsRef<dyn Array>>(
.collect()
}

/// Serializes [`Chunk`] to a vector of `ByteRecord`.
/// Serializes [`Chunk`] to a vector of rows.
/// The vector is guaranteed to have `columns.len()` entries.
/// Each `ByteRecord` is guaranteed to have `columns.array().len()` fields.
/// Each `row` is guaranteed to have `columns.array().len()` fields.
pub fn serialize<A: AsRef<dyn Array>>(
columns: &Chunk<A>,
options: &SerializeOptions,
) -> Result<Vec<ByteRecord>> {
) -> Result<Vec<Vec<u8>>> {
let mut serializers = new_serializers(columns, options)?;

let rows = columns.len();
let mut records = vec![ByteRecord::with_capacity(0, columns.arrays().len()); rows];
records.iter_mut().for_each(|record| {
let mut rows = Vec::with_capacity(columns.len());
let mut row = vec![];

// this is where the (expensive) transposition happens: the outer loop is on rows, the inner on columns
(0..columns.len()).try_for_each(|_| {
serializers
.iter_mut()
// `unwrap` is infalible because `array.len()` equals `len` in `Chunk::len`
.for_each(|iter| record.push_field(iter.next().unwrap()));
});
Ok(records)
// `unwrap` is infalible because `array.len()` equals `Chunk::len`
.for_each(|iter| {
let field = iter.next().unwrap();
row.extend_from_slice(field);
row.push(options.delimiter);
});
// replace last delimiter with new line
let last_byte = row.len() - 1;
row[last_byte] = b'\n';
rows.push(row.clone());
row.clear();
Result::Ok(())
})?;

Ok(rows)
}

/// Writes [`Chunk`] to `writer` according to the serialization options `options`.
Expand Down
2 changes: 1 addition & 1 deletion src/io/csv/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct SerializeOptions {
pub timestamp_format: Option<String>,
/// used as separator/delimiter
pub delimiter: u8,
/// used as separator/delimiter
/// quoting character
pub quote: u8,
}

Expand Down

0 comments on commit 2805aea

Please sign in to comment.