Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added test.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 5, 2021
1 parent 03c20e0 commit ca8caa5
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
3 changes: 2 additions & 1 deletion src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! `async` writing of arrow streams
use futures::AsyncWrite;

use super::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions};
pub use super::common::WriteOptions;
use super::common::{encoded_batch, DictionaryTracker, EncodedData};
use super::common_async::{write_continuation, write_message};
use super::schema_to_bytes;

Expand Down
3 changes: 3 additions & 0 deletions tests/it/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ mod read;
mod write;

pub use common::read_gzip_json;

#[cfg(feature = "io_ipc_write_async")]
mod write_async;
51 changes: 51 additions & 0 deletions tests/it/io/ipc/write_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use std::io::Cursor;

use arrow2::error::Result;
use arrow2::io::ipc::read::read_stream_metadata;
use arrow2::io::ipc::read::StreamReader;
use arrow2::io::ipc::write::stream_async::{StreamWriter, WriteOptions};
use futures::io::Cursor as AsyncCursor;

use crate::io::ipc::common::read_arrow_stream;
use crate::io::ipc::common::read_gzip_json;

async fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, batches) = read_arrow_stream(version, file_name);

let mut result = AsyncCursor::new(Vec::<u8>::new());

// write IPC version 5
{
let options = WriteOptions { compression: None };
let mut writer = StreamWriter::new(&mut result, options);
writer.start(&schema).await?;
for batch in batches {
writer.write(&batch).await?;
}
writer.finish().await?;
}
let result = result.into_inner();

let mut reader = Cursor::new(result);
let metadata = read_stream_metadata(&mut reader)?;
let reader = StreamReader::new(reader, metadata);

let schema = reader.schema().clone();

// read expected JSON output
let (expected_schema, expected_batches) = read_gzip_json(version, file_name).unwrap();

assert_eq!(schema.as_ref(), &expected_schema);

let batches = reader
.map(|x| x.map(|x| x.unwrap()))
.collect::<Result<Vec<_>>>()?;

assert_eq!(batches, expected_batches);
Ok(())
}

#[tokio::test]
async fn write_async() -> Result<()> {
test_file("1.0.0-littleendian", "generated_primitive").await
}

0 comments on commit ca8caa5

Please sign in to comment.