From f64209d6a2c1e2e32d4aded4739b5c16276ef2f1 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 8 Jan 2022 05:32:28 +0000 Subject: [PATCH] Added CI for IO IPC --- .github/workflows/test.yml | 21 +++++++++++++++++++++ Cargo.toml | 2 +- integration-testing/Cargo.toml | 2 +- src/io/ipc/compression.rs | 2 ++ src/io/ipc/read/reader.rs | 7 ++++--- tests/it/io/ipc/read/file.rs | 2 ++ tests/it/io/ipc/write/file.rs | 15 +++++---------- 7 files changed, 36 insertions(+), 15 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 160575c93f0..dcff300eeba 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -88,6 +88,27 @@ jobs: # --skip io: miri can't handle opening of files, so we skip those run: cargo miri test --features full -- --skip io::parquet --skip io::ipc + miri-checks-io: + name: MIRI on IO IPC + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: true # needed to test IPC, which are located in a submodule + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-2021-12-10 + override: true + - uses: Swatinem/rust-cache@v1 + with: + key: key1 + - name: Install Miri + run: | + rustup component add miri + cargo miri setup + - name: Run + run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_json_integration io::ipc::write::write_sliced_list + coverage: name: Coverage runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 5ec6516fcc1..b69aef9c322 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "5.0", optional = true, default-features = false } -arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", optional = true, features = ["ipc"] } +arrow-format = { version = "0.4", optional = true, features = ["ipc"] } hex = { version = "^0.4", optional = true } diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 3aa800146b5..d96279f5a73 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", features = ["full"] } +arrow-format = { version = "0.4", features = ["full"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/src/io/ipc/compression.rs b/src/io/ipc/compression.rs index 11311a5a85b..03b06868ae0 100644 --- a/src/io/ipc/compression.rs +++ b/src/io/ipc/compression.rs @@ -67,6 +67,7 @@ mod tests { #[cfg(feature = "io_ipc_compression")] #[test] + #[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support fn round_trip_zstd() { let data: Vec = (0..200u8).map(|x| x % 10).collect(); let mut buffer = vec![]; @@ -79,6 +80,7 @@ mod tests { #[cfg(feature = "io_ipc_compression")] #[test] + #[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support fn round_trip_lz4() { let data: Vec = (0..200u8).map(|x| x % 10).collect(); let mut buffer = vec![]; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 2fdf9df6c57..e109ca9d202 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; @@ -11,7 +12,7 @@ use super::super::{ARROW_MAGIC, CONTINUATION_MARKER}; use super::common::*; use super::schema::fb_to_schema; use super::Dictionaries; -use arrow_format::ipc::planus::{ReadAsRoot, ToOwned, Vector}; +use arrow_format::ipc::planus::{ReadAsRoot, Vector}; #[derive(Debug, Clone)] pub struct FileMetadata { @@ -64,7 +65,7 @@ fn read_dictionaries( reader: &mut R, fields: &[Field], ipc_schema: &IpcSchema, - blocks: Vector, + blocks: Vector, ) -> Result { let mut dictionaries = Default::default(); let mut data = vec![]; @@ -158,7 +159,7 @@ pub fn read_file_metadata(reader: &mut R) -> Result>>()?, dictionaries, }) diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 31c846bad45..5afbc9b1771 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -146,11 +146,13 @@ fn read_generated_017_union() -> Result<()> { } #[test] +#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support fn read_generated_200_compression_lz4() -> Result<()> { test_file("2.0.0-compression", "generated_lz4") } #[test] +#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support fn read_generated_200_compression_zstd() -> Result<()> { test_file("2.0.0-compression", "generated_zstd") } diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index cccfe26162f..d466c8291bc 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -30,15 +30,11 @@ fn round_trip( columns: Chunk>, schema: Schema, ipc_fields: Option>, + compression: Option, ) -> Result<()> { let (expected_schema, expected_batches) = (schema.clone(), vec![columns]); - let result = write_( - &expected_batches, - &schema, - ipc_fields, - Some(Compression::ZSTD), - )?; + let result = write_(&expected_batches, &schema, ipc_fields, compression)?; let mut reader = Cursor::new(result); let metadata = read_file_metadata(&mut reader)?; let schema = metadata.schema.clone(); @@ -340,7 +336,7 @@ fn write_boolean() -> Result<()> { ])) as Arc; let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); let columns = Chunk::try_new(vec![array])?; - round_trip(columns, schema, None) + round_trip(columns, schema, None, Some(Compression::ZSTD)) } #[test] @@ -350,11 +346,10 @@ fn write_sliced_utf8() -> Result<()> { let array = Arc::new(Utf8Array::::from_slice(["aa", "bb"]).slice(1, 1)) as Arc; let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); let columns = Chunk::try_new(vec![array])?; - round_trip(columns, schema, None) + round_trip(columns, schema, None, Some(Compression::ZSTD)) } #[test] -#[cfg_attr(miri, ignore)] // compression uses FFI, which miri does not support fn write_sliced_list() -> Result<()> { let data = vec![ Some(vec![Some(1i32), Some(2), Some(3)]), @@ -368,5 +363,5 @@ fn write_sliced_list() -> Result<()> { let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]); let columns = Chunk::try_new(vec![array])?; - round_trip(columns, schema, None) + round_trip(columns, schema, None, None) }