Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated to planus
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 8, 2022
1 parent 299df30 commit 6cb96da
Show file tree
Hide file tree
Showing 32 changed files with 773 additions and 1,259 deletions.
4 changes: 0 additions & 4 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,4 @@ ignore = [
# Therefore, this advisory does not affect us.
"RUSTSEC-2020-0071",
"RUSTSEC-2020-0159", # same as previous

# this cannot be addressed, only mitigated.
# See [.github/workflows/security.yml] for details on how we mitigate this.
"RUSTSEC-2021-0122",
]
26 changes: 0 additions & 26 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,3 @@ jobs:
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

# mitigation for RUSTSEC-2021-0122
# flatbuffers' usage of `unsafe` is problematic and a risk.
# This performs a round-trip over IPC (that uses flatbuffers) for some arrow types
# using miri, which hits much of `flatbuffers` usage in this crate.
miri-checks:
name: RUSTSEC-2021-0122 mitigation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true # needed to test IPC, which are located in a submodule
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-12-10
override: true
- uses: Swatinem/rust-cache@v1
with:
key: key1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup
- name: Run
run: MIRIFLAGS="-Zmiri-disable-stacked-borrows -Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_ipc_compression,io_json_integration io::ipc::write::file::write_100_nested
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "5.0", optional = true, default-features = false }

arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] }
arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.3.0", features = ["full"] }
arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format", branch = "planus", features = ["full"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
Expand Down
34 changes: 15 additions & 19 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use arrow2::{
ipc::IpcField,
},
};
use arrow_format::flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow_format::{
flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
},
ipc::planus::ReadAsRoot,
};
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -267,25 +269,19 @@ async fn receive_batch_flight_data(
) -> Option<FlightData> {
let mut data = resp.next().await?.ok()?;
let mut message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message");
ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message");

while message.header_type() == MessageHeader::DictionaryBatch {
while let ipc::MessageHeaderRef::DictionaryBatch(batch) = message
.header()
.expect("Header to be valid flatbuffers")
.expect("Header to be present")
{
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
message
.header_as_dictionary_batch()
.expect("Error parsing dictionary"),
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
)
.expect("Error reading dictionary");
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0)
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message");
}

Some(data)
Expand Down
44 changes: 22 additions & 22 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use arrow2::io::ipc::IpcSchema;
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::MessageHeaderRef;

use arrow2::{datatypes::*, io::flight::serialize_schema_to_info, io::ipc};

Expand Down Expand Up @@ -276,25 +276,21 @@ async fn send_app_metadata(
}

async fn record_batch_from_message(
message: Message<'_>,
batch: arrow_format::ipc::RecordBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>, Status> {
let ipc_batch = message
.header_as_record_batch()
.ok_or_else(|| Status::internal("Could not parse message header as record batch"))?;

let mut reader = std::io::Cursor::new(data_body);

let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
batch,
fields,
ipc_schema,
None,
dictionaries,
ArrowSchema::MetadataVersion::V5,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
);
Expand All @@ -303,20 +299,16 @@ async fn record_batch_from_message(
}

async fn dictionary_from_message(
message: Message<'_>,
dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let ipc_batch = message
.header_as_dictionary_batch()
.ok_or_else(|| Status::internal("Could not parse message header as dictionary batch"))?;

let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result =
ipc::read::read_dictionary(ipc_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand All @@ -335,20 +327,28 @@ async fn save_uploaded_chunks(
let mut dictionaries = Default::default();

while let Some(Ok(data)) = input_stream.next().await {
let message = root_as_message(&data.data_header[..])
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?;
let header = message
.header()
.map_err(|x| Status::internal(x.to_string()))?
.ok_or_else(|| {
Status::internal(
"Unable to convert flight data header to a record batch".to_string(),
)
})?;

match message.header_type() {
MessageHeader::Schema => {
match header {
MessageHeaderRef::Schema(_) => {
return Err(Status::internal(
"Not expecting a schema when messages are read",
))
}
MessageHeader::RecordBatch => {
MessageHeaderRef::RecordBatch(batch) => {
send_app_metadata(&mut response_tx, &data.app_metadata).await?;

let batch = record_batch_from_message(
message,
batch,
&data.data_body,
&schema.fields,
&ipc_schema,
Expand All @@ -358,9 +358,9 @@ async fn save_uploaded_chunks(

chunks.push(batch);
}
MessageHeader::DictionaryBatch => {
MessageHeaderRef::DictionaryBatch(dict_batch) => {
dictionary_from_message(
message,
dict_batch,
&data.data_body,
&schema.fields,
&ipc_schema,
Expand Down
60 changes: 25 additions & 35 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use arrow_format::flight::data::{FlightData, SchemaResult};
use arrow_format::ipc;
use arrow_format::ipc::planus::ReadAsRoot;

use crate::{
array::Array,
Expand Down Expand Up @@ -84,19 +85,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa
/// Deserialize an IPC message into [`Schema`], [`IpcSchema`].
/// Use to deserialize [`FlightData::data_header`] and [`SchemaResult::schema`].
pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
if let Ok(ipc) = ipc::Message::root_as_message(bytes) {
if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) {
schemas
} else {
Err(ArrowError::OutOfSpec(
"Unable to get head as schema".to_string(),
))
}
} else {
Err(ArrowError::OutOfSpec(
"Unable to get root as message".to_string(),
))
}
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] to [`Chunk`].
Expand All @@ -107,29 +96,30 @@ pub fn deserialize_batch(
dictionaries: &read::Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>> {
// check that the data_header is a record batch message
let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message =
arrow_format::ipc::MessageRef::read_as_root(&data.data_header).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;

let mut reader = std::io::Cursor::new(&data.data_body);

message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::OutOfSpec(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.map(|batch| {
read::read_record_batch(
batch,
fields,
ipc_schema,
None,
dictionaries,
ipc::Schema::MetadataVersion::V5,
&mut reader,
0,
)
})?
let version = message.version()?;

match message.header()?.ok_or_else(|| {
ArrowError::oos("Unable to convert flight data header to a record batch".to_string())
})? {
ipc::MessageHeaderRef::RecordBatch(batch) => read::read_record_batch(
batch,
fields,
ipc_schema,
None,
dictionaries,
version,
&mut reader,
0,
),
_ => Err(ArrowError::nyi(
"flight currently only supports reading RecordBatch messages",
)),
}
}
8 changes: 8 additions & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).
use crate::error::ArrowError;

mod compression;
mod endianess;

Expand All @@ -100,3 +102,9 @@ pub struct IpcSchema {
pub fields: Vec<IpcField>,
pub is_little_endian: bool,
}

impl From<arrow_format::ipc::planus::Error> for ArrowError {
fn from(error: arrow_format::ipc::planus::Error) -> Self {
ArrowError::OutOfSpec(error.to_string())
}
}
10 changes: 4 additions & 6 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use arrow_format::ipc;

use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
buffers: &mut VecDeque<IpcBuffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
compression: Option<Compression>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
Expand Down Expand Up @@ -64,7 +62,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(

pub fn skip_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
Expand Down
10 changes: 4 additions & 6 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use arrow_format::ipc;

use crate::array::BooleanArray;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};

use super::super::deserialize::Node;
use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
buffers: &mut VecDeque<IpcBuffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
compression: Option<Compression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(format!(
Expand Down Expand Up @@ -49,7 +47,7 @@ pub fn read_boolean<R: Read + Seek>(

pub fn skip_boolean(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
buffers: &mut VecDeque<IpcBuffer>,
) -> Result<()> {
let _ = field_nodes.pop_front().ok_or_else(|| {
ArrowError::oos(
Expand Down
Loading

0 comments on commit 6cb96da

Please sign in to comment.