From 83d828c37d587d6c9f4252ae26e34d1ac78f1d94 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Fri, 5 Nov 2021 17:37:15 +0100 Subject: [PATCH] Simplified IPC code (#576) --- examples/extension.rs | 3 +- examples/ipc_file_write.rs | 3 +- .../src/bin/arrow-file-to-stream.rs | 5 +- .../src/bin/arrow-json-integration-test.rs | 5 +- .../src/bin/arrow-stream-to-file.rs | 5 +- .../integration_test.rs | 7 +- .../auth_basic_proto.rs | 3 - .../integration_test.rs | 9 +- src/io/flight/mod.rs | 26 +-- src/io/ipc/mod.rs | 5 +- src/io/ipc/write/common.rs | 156 +++--------------- src/io/ipc/write/mod.rs | 3 +- src/io/ipc/write/schema.rs | 5 +- src/io/ipc/write/stream.rs | 26 +-- src/io/ipc/write/writer.rs | 33 ++-- src/io/parquet/write/schema.rs | 4 +- tests/it/io/ipc/write/file.rs | 11 +- tests/it/io/ipc/write/stream.rs | 6 +- 18 files changed, 94 insertions(+), 221 deletions(-) diff --git a/examples/extension.rs b/examples/extension.rs index 25807924bc3..53b0ed2ad55 100644 --- a/examples/extension.rs +++ b/examples/extension.rs @@ -37,7 +37,8 @@ fn main() -> Result<()> { fn write_ipc(writer: W, array: impl Array + 'static) -> Result { let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]); - let mut writer = write::FileWriter::try_new(writer, &schema)?; + let options = write::WriteOptions { compression: None }; + let mut writer = write::FileWriter::try_new(writer, &schema, options)?; let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; diff --git a/examples/ipc_file_write.rs b/examples/ipc_file_write.rs index 232258f2ab5..5ae5b26ba84 100644 --- a/examples/ipc_file_write.rs +++ b/examples/ipc_file_write.rs @@ -10,7 +10,8 @@ use arrow2::record_batch::RecordBatch; fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> { let file = File::create(path)?; - let mut writer = write::FileWriter::try_new(file, schema)?; + let options = write::WriteOptions { compression: None }; + let mut writer = write::FileWriter::try_new(file, schema, options)?; for batch in batches { writer.write(batch)? diff --git a/integration-testing/src/bin/arrow-file-to-stream.rs b/integration-testing/src/bin/arrow-file-to-stream.rs index f060bd36820..b3b024abb1d 100644 --- a/integration-testing/src/bin/arrow-file-to-stream.rs +++ b/integration-testing/src/bin/arrow-file-to-stream.rs @@ -20,7 +20,7 @@ use std::fs::File; use arrow2::error::Result; use arrow2::io::ipc::read; -use arrow2::io::ipc::write::StreamWriter; +use arrow2::io::ipc::write; fn main() -> Result<()> { let args: Vec = env::args().collect(); @@ -30,7 +30,8 @@ fn main() -> Result<()> { let mut reader = read::FileReader::new(f, metadata, None); let schema = reader.schema(); - let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?; + let options = write::WriteOptions { compression: None }; + let mut writer = write::StreamWriter::try_new(std::io::stdout(), schema, options)?; reader.try_for_each(|batch| { let batch = batch?; diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs b/integration-testing/src/bin/arrow-json-integration-test.rs index 3dee11da68c..becb663f5ce 100644 --- a/integration-testing/src/bin/arrow-json-integration-test.rs +++ b/integration-testing/src/bin/arrow-json-integration-test.rs @@ -20,7 +20,7 @@ use std::fs::File; use clap::{App, Arg}; use arrow2::io::ipc::read; -use arrow2::io::ipc::write::FileWriter; +use arrow2::io::ipc::write; use arrow2::{ error::{ArrowError, Result}, io::json_integration::*, @@ -81,7 +81,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()> let json_file = read_json_file(json_name)?; let arrow_file = File::create(arrow_name)?; - let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?; + let options = write::WriteOptions { compression: None }; + let mut writer = write::FileWriter::try_new(arrow_file, &json_file.schema, options)?; for b in json_file.batches { writer.write(&b)?; diff --git a/integration-testing/src/bin/arrow-stream-to-file.rs b/integration-testing/src/bin/arrow-stream-to-file.rs index f40d8add7eb..fe41fe8e45c 100644 --- a/integration-testing/src/bin/arrow-stream-to-file.rs +++ b/integration-testing/src/bin/arrow-stream-to-file.rs @@ -19,7 +19,7 @@ use std::io; use arrow2::error::Result; use arrow2::io::ipc::read; -use arrow2::io::ipc::write::FileWriter; +use arrow2::io::ipc::write; fn main() -> Result<()> { let mut reader = io::stdin(); @@ -29,7 +29,8 @@ fn main() -> Result<()> { let writer = io::stdout(); - let mut writer = FileWriter::try_new(writer, schema)?; + let options = write::WriteOptions { compression: None }; + let mut writer = write::FileWriter::try_new(writer, schema, options)?; arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?; writer.finish()?; diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 7c67b34096e..27a06b307c7 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -78,8 +78,9 @@ async fn upload_data( ) -> Result { let (mut upload_tx, upload_rx) = mpsc::channel(10); - let options = write::IpcWriteOptions::default(); - let mut schema = flight::serialize_schema(&schema, &options); + let options = write::WriteOptions { compression: None }; + + let mut schema = flight::serialize_schema(&schema); schema.flight_descriptor = Some(descriptor.clone()); upload_tx.send(schema).await?; @@ -129,7 +130,7 @@ async fn send_batch( upload_tx: &mut mpsc::Sender, metadata: &[u8], batch: &RecordBatch, - options: &write::IpcWriteOptions, + options: &write::WriteOptions, ) -> Result { let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options); diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index baa181e4efc..fc5c936a3a3 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use arrow_format::flight::data::*; use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; -use tokio::sync::Mutex; use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; type TonicStream = Pin + Send + Sync + 'static>>; @@ -37,7 +36,6 @@ pub async fn scenario_setup(port: &str) -> Result { let service = AuthBasicProtoScenarioImpl { username: AUTH_USERNAME.into(), password: AUTH_PASSWORD.into(), - peer_identity: Arc::new(Mutex::new(None)), }; let addr = super::listen_on(port).await?; let svc = FlightServiceServer::new(service); @@ -54,7 +52,6 @@ pub async fn scenario_setup(port: &str) -> Result { pub struct AuthBasicProtoScenarioImpl { username: Arc, password: Arc, - peer_identity: Arc>>, } impl AuthBasicProtoScenarioImpl { diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 4d3349bff44..c2f9d967be0 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -21,8 +21,8 @@ use std::pin::Pin; use std::sync::Arc; use arrow2::io::flight::{serialize_batch, serialize_schema}; -use arrow_format::flight::data::*; use arrow_format::flight::data::flight_descriptor::*; +use arrow_format::flight::data::*; use arrow_format::flight::service::flight_service_server::*; use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; @@ -108,9 +108,9 @@ impl FlightService for FlightServiceImpl { .get(&key) .ok_or_else(|| Status::not_found(format!("Could not find flight. {}", key)))?; - let options = ipc::write::IpcWriteOptions::default(); + let options = ipc::write::WriteOptions { compression: None }; - let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options))); + let schema = std::iter::once(Ok(serialize_schema(&flight.schema))); let batches = flight .chunks @@ -171,8 +171,7 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); - let options = ipc::write::IpcWriteOptions::default(); - let schema = serialize_schema_to_info(&flight.schema, &options).expect( + let schema = serialize_schema_to_info(&flight.schema).expect( "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", ); diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index ab4b0eb9283..1b772c0512b 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -11,7 +11,7 @@ use crate::{ io::ipc::fb_to_schema, io::ipc::read::read_record_batch, io::ipc::write, - io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions}, + io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions}, record_batch::RecordBatch, }; @@ -19,7 +19,7 @@ use crate::{ /// and a [`FlightData`] representing the batch. pub fn serialize_batch( batch: &RecordBatch, - options: &IpcWriteOptions, + options: &WriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); @@ -44,15 +44,15 @@ impl From for FlightData { } /// Serializes a [`Schema`] to [`SchemaResult`]. -pub fn serialize_schema_to_result(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { +pub fn serialize_schema_to_result(schema: &Schema) -> SchemaResult { SchemaResult { - schema: schema_as_flatbuffer(schema, options), + schema: schema_as_flatbuffer(schema), } } /// Serializes a [`Schema`] to [`FlightData`]. -pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { - let data_header = schema_as_flatbuffer(schema, options); +pub fn serialize_schema(schema: &Schema) -> FlightData { + let data_header = schema_as_flatbuffer(schema); FlightData { data_header, ..Default::default() @@ -60,22 +60,22 @@ pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightDat } /// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`]. -pub fn serialize_schema_to_info(schema: &Schema, options: &IpcWriteOptions) -> Result> { - let encoded_data = schema_as_encoded_data(schema, options); +pub fn serialize_schema_to_info(schema: &Schema) -> Result> { + let encoded_data = schema_as_encoded_data(schema); let mut schema = vec![]; - write::common::write_message(&mut schema, encoded_data, options)?; + write::common::write_message(&mut schema, encoded_data)?; Ok(schema) } -fn schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> Vec { - let encoded_data = schema_as_encoded_data(schema, options); +fn schema_as_flatbuffer(schema: &Schema) -> Vec { + let encoded_data = schema_as_encoded_data(schema); encoded_data.ipc_message } -fn schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { +fn schema_as_encoded_data(schema: &Schema) -> EncodedData { EncodedData { - ipc_message: write::schema_to_bytes(arrow_schema, *options.metadata_version()), + ipc_message: write::schema_to_bytes(schema), arrow_data: vec![], } } diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index df0524baf72..6450763cde4 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -30,7 +30,7 @@ //! # Examples //! Read and write to a file: //! ``` -//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}}; +//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::{FileWriter, WriteOptions}}}; //! # use std::fs::File; //! # use std::sync::Arc; //! # use arrow2::datatypes::{Field, Schema, DataType}; @@ -43,7 +43,8 @@ //! let x_coord = Field::new("x", DataType::Int32, false); //! let y_coord = Field::new("y", DataType::Int32, false); //! let schema = Schema::new(vec![x_coord, y_coord]); -//! let mut writer = FileWriter::try_new(file, &schema)?; +//! let options = WriteOptions {compression: None}; +//! let mut writer = FileWriter::try_new(file, &schema, options)?; //! //! // Setup the data //! let x_data = Int32Array::from_slice([-1i32, 1]); diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index 699a85ae2ad..7f92c140b72 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -41,90 +41,18 @@ pub enum Compression { ZSTD, } -/// IPC write options used to control the behaviour of the writer +/// Options declaring the behaviour of writing to IPC #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct IpcWriteOptions { - /// Write padding after memory buffers to this multiple of bytes. - /// Generally 8 or 64, defaults to 8 - alignment: usize, - /// The legacy format is for releases before 0.15.0, and uses metadata V4 - write_legacy_ipc_format: bool, - /// The metadata version to write. The Rust IPC writer supports V4+ - /// - /// *Default versions per crate* - /// - /// When creating the default IpcWriteOptions, the following metadata versions are used: - /// - /// version 2.0.0: V4, with legacy format enabled - /// version 4.0.0: V5 - metadata_version: ipc::Schema::MetadataVersion, - /// Whether the buffers should be compressed - compression: Option, -} - -impl IpcWriteOptions { - /// Try create IpcWriteOptions, checking for incompatible settings - pub fn try_new( - alignment: usize, - write_legacy_ipc_format: bool, - metadata_version: ipc::Schema::MetadataVersion, - compression: Option, - ) -> Result { - if alignment == 0 || alignment % 8 != 0 { - return Err(ArrowError::InvalidArgumentError( - "Alignment should be greater than 0 and be a multiple of 8".to_string(), - )); - } - match metadata_version { - ipc::Schema::MetadataVersion::V1 - | ipc::Schema::MetadataVersion::V2 - | ipc::Schema::MetadataVersion::V3 => Err(ArrowError::InvalidArgumentError( - "Writing IPC metadata version 3 and lower not supported".to_string(), - )), - ipc::Schema::MetadataVersion::V4 => Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - compression, - }), - ipc::Schema::MetadataVersion::V5 => { - if write_legacy_ipc_format { - Err(ArrowError::InvalidArgumentError( - "Legacy IPC format only supported on metadata version 4".to_string(), - )) - } else { - Ok(Self { - alignment, - write_legacy_ipc_format, - metadata_version, - compression, - }) - } - } - z => panic!("Unsupported ipc::Schema::MetadataVersion {:?}", z), - } - } - - pub fn metadata_version(&self) -> &ipc::Schema::MetadataVersion { - &self.metadata_version - } -} - -impl Default for IpcWriteOptions { - fn default() -> Self { - Self { - alignment: 8, - write_legacy_ipc_format: false, - metadata_version: ipc::Schema::MetadataVersion::V5, - compression: None, - } - } +pub struct WriteOptions { + /// Whether the buffers should be compressed and which codec to use. + /// Note: to use compression the crate must be compiled with feature `io_ipc_compression`. + pub compression: Option, } pub fn encoded_batch( batch: &RecordBatch, dictionary_tracker: &mut DictionaryTracker, - write_options: &IpcWriteOptions, + options: &WriteOptions, ) -> Result<(Vec, EncodedData)> { // TODO: handle nested dictionaries let schema = batch.schema(); @@ -144,21 +72,21 @@ pub fn encoded_batch( encoded_dictionaries.push(dictionary_batch_to_bytes( dict_id, column.as_ref(), - write_options, + options, is_native_little_endian(), )); } } } - let encoded_message = record_batch_to_bytes(batch, write_options); + let encoded_message = record_batch_to_bytes(batch, options); Ok((encoded_dictionaries, encoded_message)) } /// Write a `RecordBatch` into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the batch's data -fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) -> EncodedData { +fn record_batch_to_bytes(batch: &RecordBatch, options: &WriteOptions) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -173,7 +101,7 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - &mut nodes, &mut offset, is_native_little_endian(), - write_options.compression, + options.compression, ) } @@ -181,7 +109,7 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); - let compression = if let Some(compression) = write_options.compression { + let compression = if let Some(compression) = options.compression { let compression = match compression { Compression::LZ4 => CompressionType::LZ4_FRAME, Compression::ZSTD => CompressionType::ZSTD, @@ -206,7 +134,7 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - }; // create an ipc::Schema::Message let mut message = ipc::Message::MessageBuilder::new(&mut fbb); - message.add_version(write_options.metadata_version); + message.add_version(ipc::Schema::MetadataVersion::V5); message.add_header_type(ipc::Message::MessageHeader::RecordBatch); message.add_bodyLength(arrow_data.len() as i64); message.add_header(root); @@ -225,7 +153,7 @@ fn record_batch_to_bytes(batch: &RecordBatch, write_options: &IpcWriteOptions) - fn dictionary_batch_to_bytes( dict_id: i64, array: &dyn Array, - write_options: &IpcWriteOptions, + options: &WriteOptions, is_little_endian: bool, ) -> EncodedData { let mut fbb = FlatBufferBuilder::new(); @@ -241,7 +169,7 @@ fn dictionary_batch_to_bytes( &mut nodes, &mut 0, is_little_endian, - write_options.compression, + options.compression, false, ); @@ -249,7 +177,7 @@ fn dictionary_batch_to_bytes( let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); - let compression = if let Some(compression) = write_options.compression { + let compression = if let Some(compression) = options.compression { let compression = match compression { Compression::LZ4 => CompressionType::LZ4_FRAME, Compression::ZSTD => CompressionType::ZSTD, @@ -281,7 +209,7 @@ fn dictionary_batch_to_bytes( let root = { let mut message_builder = ipc::Message::MessageBuilder::new(&mut fbb); - message_builder.add_version(write_options.metadata_version); + message_builder.add_version(ipc::Schema::MetadataVersion::V5); message_builder.add_header_type(ipc::Message::MessageHeader::DictionaryBatch); message_builder.add_bodyLength(arrow_data.len() as i64); message_builder.add_header(root); @@ -365,28 +293,20 @@ pub struct EncodedData { } /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written -pub fn write_message( - writer: &mut W, - encoded: EncodedData, - write_options: &IpcWriteOptions, -) -> Result<(usize, usize)> { +pub fn write_message(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> { let arrow_data_len = encoded.arrow_data.len(); if arrow_data_len % 8 != 0 { return Err(ArrowError::Ipc("Arrow data not aligned".to_string())); } - let a = write_options.alignment - 1; + let a = 8 - 1; let buffer = encoded.ipc_message; let flatbuf_size = buffer.len(); - let prefix_size = if write_options.write_legacy_ipc_format { - 4 - } else { - 8 - }; + let prefix_size = 8; let aligned_size = (flatbuf_size + prefix_size + a) & !a; let padding_bytes = aligned_size - flatbuf_size - prefix_size; - write_continuation(writer, write_options, (aligned_size - prefix_size) as i32)?; + write_continuation(writer, (aligned_size - prefix_size) as i32)?; // write the flatbuf if flatbuf_size > 0 { @@ -422,39 +342,11 @@ fn write_body_buffers(mut writer: W, data: &[u8]) -> Result { /// Write a record batch to the writer, writing the message size before the message /// if the record batch is being written to a stream -pub fn write_continuation( - writer: &mut W, - write_options: &IpcWriteOptions, - total_len: i32, -) -> Result { - let mut written = 8; - - // the version of the writer determines whether continuation markers should be added - match write_options.metadata_version { - ipc::Schema::MetadataVersion::V1 - | ipc::Schema::MetadataVersion::V2 - | ipc::Schema::MetadataVersion::V3 => { - unreachable!("Options with the metadata version cannot be created") - } - ipc::Schema::MetadataVersion::V4 => { - if !write_options.write_legacy_ipc_format { - // v0.15.0 format - writer.write_all(&CONTINUATION_MARKER)?; - written = 4; - } - writer.write_all(&total_len.to_le_bytes()[..])?; - } - ipc::Schema::MetadataVersion::V5 => { - // write continuation marker and message length - writer.write_all(&CONTINUATION_MARKER)?; - writer.write_all(&total_len.to_le_bytes()[..])?; - } - z => panic!("Unsupported ipc::Schema::MetadataVersion {:?}", z), - }; - +pub fn write_continuation(writer: &mut W, total_len: i32) -> Result { + writer.write_all(&CONTINUATION_MARKER)?; + writer.write_all(&total_len.to_le_bytes()[..])?; writer.flush()?; - - Ok(written) + Ok(8) } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index 4d5627959cd..ba823de9100 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -5,8 +5,7 @@ mod serialize; mod stream; mod writer; -pub use arrow_format::ipc::Schema::MetadataVersion; -pub use common::{Compression, IpcWriteOptions}; +pub use common::{Compression, WriteOptions}; pub use schema::schema_to_bytes; pub use serialize::{write, write_dictionary}; pub use stream::StreamWriter; diff --git a/src/io/ipc/write/schema.rs b/src/io/ipc/write/schema.rs index c56e6136fbe..b5400ef67e7 100644 --- a/src/io/ipc/write/schema.rs +++ b/src/io/ipc/write/schema.rs @@ -4,10 +4,9 @@ use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use crate::datatypes::*; use super::super::convert; -use super::MetadataVersion; /// Converts -pub fn schema_to_bytes(schema: &Schema, version: MetadataVersion) -> Vec { +pub fn schema_to_bytes(schema: &Schema) -> Vec { let mut fbb = FlatBufferBuilder::new(); let schema = { let fb = convert::schema_to_fb_offset(&mut fbb, schema); @@ -15,7 +14,7 @@ pub fn schema_to_bytes(schema: &Schema, version: MetadataVersion) -> Vec { }; let mut message = ipc::Message::MessageBuilder::new(&mut fbb); - message.add_version(version); + message.add_version(ipc::Schema::MetadataVersion::V5); message.add_header_type(ipc::Message::MessageHeader::Schema); message.add_bodyLength(0); message.add_header(schema); diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 83c83ee54d5..cfd51ef312b 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -23,8 +23,7 @@ use std::io::Write; use super::common::{ - encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData, - IpcWriteOptions, + encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData, WriteOptions, }; use super::schema_to_bytes; @@ -42,7 +41,7 @@ pub struct StreamWriter { /// The object to write to writer: W, /// IPC write options - write_options: IpcWriteOptions, + write_options: WriteOptions, /// Whether the writer footer has been written, and the writer is finished finished: bool, /// Keeps track of dictionaries that have been written @@ -51,22 +50,13 @@ pub struct StreamWriter { impl StreamWriter { /// Try create a new writer, with the schema written as part of the header - pub fn try_new(writer: W, schema: &Schema) -> Result { - let write_options = IpcWriteOptions::default(); - Self::try_new_with_options(writer, schema, write_options) - } - - pub fn try_new_with_options( - mut writer: W, - schema: &Schema, - write_options: IpcWriteOptions, - ) -> Result { + pub fn try_new(mut writer: W, schema: &Schema, write_options: WriteOptions) -> Result { // write the schema, set the written bytes to the schema let encoded_message = EncodedData { - ipc_message: schema_to_bytes(schema, *write_options.metadata_version()), + ipc_message: schema_to_bytes(schema), arrow_data: vec![], }; - write_message(&mut writer, encoded_message, &write_options)?; + write_message(&mut writer, encoded_message)?; Ok(Self { writer, write_options, @@ -88,16 +78,16 @@ impl StreamWriter { .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + write_message(&mut self.writer, encoded_dictionary)?; } - write_message(&mut self.writer, encoded_message, &self.write_options)?; + write_message(&mut self.writer, encoded_message)?; Ok(()) } /// Write continuation bytes, and mark the stream as done pub fn finish(&mut self) -> Result<()> { - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation(&mut self.writer, 0)?; self.finished = true; diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 8c604e9cab8..88448df7509 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -30,7 +30,7 @@ use super::{ super::convert, common::{ encoded_batch, write_continuation, write_message, DictionaryTracker, EncodedData, - IpcWriteOptions, + WriteOptions, }, schema_to_bytes, }; @@ -44,7 +44,7 @@ pub struct FileWriter { /// The object to write to writer: W, /// IPC write options - write_options: IpcWriteOptions, + options: WriteOptions, /// A reference to the schema, used in validating record batches schema: Schema, /// The number of bytes between each block of bytes, as an offset for random access @@ -61,30 +61,20 @@ pub struct FileWriter { impl FileWriter { /// Try create a new writer, with the schema written as part of the header - pub fn try_new(writer: W, schema: &Schema) -> Result { - let write_options = IpcWriteOptions::default(); - Self::try_new_with_options(writer, schema, write_options) - } - - /// Try create a new writer with IpcWriteOptions - pub fn try_new_with_options( - mut writer: W, - schema: &Schema, - write_options: IpcWriteOptions, - ) -> Result { + pub fn try_new(mut writer: W, schema: &Schema, options: WriteOptions) -> Result { // write magic to header writer.write_all(&ARROW_MAGIC[..])?; // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema let encoded_message = EncodedData { - ipc_message: schema_to_bytes(schema, *write_options.metadata_version()), + ipc_message: schema_to_bytes(schema), arrow_data: vec![], }; - let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; + let (meta, data) = write_message(&mut writer, encoded_message)?; Ok(Self { writer, - write_options, + options, schema: schema.clone(), block_offsets: meta + data + 8, dictionary_blocks: vec![], @@ -107,18 +97,17 @@ impl FileWriter { } let (encoded_dictionaries, encoded_message) = - encoded_batch(batch, &mut self.dictionary_tracker, &self.write_options)?; + encoded_batch(batch, &mut self.dictionary_tracker, &self.options)?; for encoded_dictionary in encoded_dictionaries { - let (meta, data) = - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + let (meta, data) = write_message(&mut self.writer, encoded_dictionary)?; let block = ipc::File::Block::new(self.block_offsets as i64, meta as i32, data as i64); self.dictionary_blocks.push(block); self.block_offsets += meta + data; } - let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; + let (meta, data) = write_message(&mut self.writer, encoded_message)?; // add a record block for the footer let block = ipc::File::Block::new( self.block_offsets as i64, @@ -133,7 +122,7 @@ impl FileWriter { /// Write footer and closing tag, then mark the writer as done pub fn finish(&mut self) -> Result<()> { // write EOS - write_continuation(&mut self.writer, &self.write_options, 0)?; + write_continuation(&mut self.writer, 0)?; let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -142,7 +131,7 @@ impl FileWriter { let root = { let mut footer_builder = ipc::File::FooterBuilder::new(&mut fbb); - footer_builder.add_version(*self.write_options.metadata_version()); + footer_builder.add_version(ipc::Schema::MetadataVersion::V5); footer_builder.add_schema(schema); footer_builder.add_dictionaries(dictionaries); footer_builder.add_recordBatches(record_batches); diff --git a/src/io/parquet/write/schema.rs b/src/io/parquet/write/schema.rs index 595cf6b63c2..88fed6505f3 100644 --- a/src/io/parquet/write/schema.rs +++ b/src/io/parquet/write/schema.rs @@ -12,14 +12,14 @@ use parquet2::{ use crate::{ datatypes::{DataType, Field, Schema, TimeUnit}, error::{ArrowError, Result}, - io::ipc::write::{schema_to_bytes, MetadataVersion}, + io::ipc::write::schema_to_bytes, io::parquet::write::decimal_length_from_precision, }; use super::super::ARROW_SCHEMA_META_KEY; pub fn schema_to_metadata_key(schema: &Schema) -> KeyValue { - let serialized_schema = schema_to_bytes(schema, MetadataVersion::V5); + let serialized_schema = schema_to_bytes(schema); // manually prepending the length to the schema as arrow uses the legacy IPC format // TODO: change after addressing ARROW-9777 diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 132768b1f7c..4b3929c2c96 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -13,9 +13,10 @@ fn round_trip(batch: RecordBatch) -> Result<()> { // write IPC version 5 let written_result = { - let options = - IpcWriteOptions::try_new(8, false, MetadataVersion::V5, Some(Compression::ZSTD))?; - let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?; + let options = WriteOptions { + compression: Some(Compression::ZSTD), + }; + let mut writer = FileWriter::try_new(result, batch.schema(), options)?; writer.write(&batch)?; writer.finish()?; writer.into_inner() @@ -50,8 +51,8 @@ fn test_file(version: &str, file_name: &str, compressed: bool) -> Result<()> { // write IPC version 5 let written_result = { - let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5, compression)?; - let mut writer = FileWriter::try_new_with_options(result, &schema, options)?; + let options = WriteOptions { compression }; + let mut writer = FileWriter::try_new(result, &schema, options)?; for batch in batches { writer.write(&batch)?; } diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index 9da5587e692..54693a22fdb 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -3,7 +3,7 @@ use std::io::Cursor; use arrow2::error::Result; use arrow2::io::ipc::read::read_stream_metadata; use arrow2::io::ipc::read::StreamReader; -use arrow2::io::ipc::write::{IpcWriteOptions, MetadataVersion, StreamWriter}; +use arrow2::io::ipc::write::{StreamWriter, WriteOptions}; use crate::io::ipc::common::read_arrow_stream; use crate::io::ipc::common::read_gzip_json; @@ -15,8 +15,8 @@ fn test_file(version: &str, file_name: &str) { // write IPC version 5 { - let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5, None).unwrap(); - let mut writer = StreamWriter::try_new_with_options(&mut result, &schema, options).unwrap(); + let options = WriteOptions { compression: None }; + let mut writer = StreamWriter::try_new(&mut result, &schema, options).unwrap(); for batch in batches { writer.write(&batch).unwrap(); }