Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simplified IPC code (#576)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Nov 5, 2021
1 parent 0dda942 commit 83d828c
Show file tree
Hide file tree
Showing 18 changed files with 94 additions and 221 deletions.
3 changes: 2 additions & 1 deletion examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ fn main() -> Result<()> {
fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let mut writer = write::FileWriter::try_new(writer, &schema)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, options)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

Expand Down
3 changes: 2 additions & 1 deletion examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use arrow2::record_batch::RecordBatch;
fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> {
let file = File::create(path)?;

let mut writer = write::FileWriter::try_new(file, schema)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(file, schema, options)?;

for batch in batches {
writer.write(batch)?
Expand Down
5 changes: 3 additions & 2 deletions integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fs::File;

use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write::StreamWriter;
use arrow2::io::ipc::write;

fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();
Expand All @@ -30,7 +30,8 @@ fn main() -> Result<()> {
let mut reader = read::FileReader::new(f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::StreamWriter::try_new(std::io::stdout(), schema, options)?;

reader.try_for_each(|batch| {
let batch = batch?;
Expand Down
5 changes: 3 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::fs::File;
use clap::{App, Arg};

use arrow2::io::ipc::read;
use arrow2::io::ipc::write::FileWriter;
use arrow2::io::ipc::write;
use arrow2::{
error::{ArrowError, Result},
io::json_integration::*,
Expand Down Expand Up @@ -81,7 +81,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>
let json_file = read_json_file(json_name)?;

let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(arrow_file, &json_file.schema, options)?;

for b in json_file.batches {
writer.write(&b)?;
Expand Down
5 changes: 3 additions & 2 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::io;

use arrow2::error::Result;
use arrow2::io::ipc::read;
use arrow2::io::ipc::write::FileWriter;
use arrow2::io::ipc::write;

fn main() -> Result<()> {
let mut reader = io::stdin();
Expand All @@ -29,7 +29,8 @@ fn main() -> Result<()> {

let writer = io::stdout();

let mut writer = FileWriter::try_new(writer, schema)?;
let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, schema, options)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
writer.finish()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ async fn upload_data(
) -> Result {
let (mut upload_tx, upload_rx) = mpsc::channel(10);

let options = write::IpcWriteOptions::default();
let mut schema = flight::serialize_schema(&schema, &options);
let options = write::WriteOptions { compression: None };

let mut schema = flight::serialize_schema(&schema);
schema.flight_descriptor = Some(descriptor.clone());
upload_tx.send(schema).await?;

Expand Down Expand Up @@ -129,7 +130,7 @@ async fn send_batch(
upload_tx: &mut mpsc::Sender<FlightData>,
metadata: &[u8],
batch: &RecordBatch,
options: &write::IpcWriteOptions,
options: &write::WriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming};

type TonicStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;
Expand All @@ -37,7 +36,6 @@ pub async fn scenario_setup(port: &str) -> Result {
let service = AuthBasicProtoScenarioImpl {
username: AUTH_USERNAME.into(),
password: AUTH_PASSWORD.into(),
peer_identity: Arc::new(Mutex::new(None)),
};
let addr = super::listen_on(port).await?;
let svc = FlightServiceServer::new(service);
Expand All @@ -54,7 +52,6 @@ pub async fn scenario_setup(port: &str) -> Result {
pub struct AuthBasicProtoScenarioImpl {
username: Arc<str>,
password: Arc<str>,
peer_identity: Arc<Mutex<Option<String>>>,
}

impl AuthBasicProtoScenarioImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::pin::Pin;
use std::sync::Arc;

use arrow2::io::flight::{serialize_batch, serialize_schema};
use arrow_format::flight::data::*;
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
Expand Down Expand Up @@ -108,9 +108,9 @@ impl FlightService for FlightServiceImpl {
.get(&key)
.ok_or_else(|| Status::not_found(format!("Could not find flight. {}", key)))?;

let options = ipc::write::IpcWriteOptions::default();
let options = ipc::write::WriteOptions { compression: None };

let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options)));
let schema = std::iter::once(Ok(serialize_schema(&flight.schema)));

let batches = flight
.chunks
Expand Down Expand Up @@ -171,8 +171,7 @@ impl FlightService for FlightServiceImpl {

let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();

let options = ipc::write::IpcWriteOptions::default();
let schema = serialize_schema_to_info(&flight.schema, &options).expect(
let schema = serialize_schema_to_info(&flight.schema).expect(
"Could not generate schema bytes from schema stored by a DoPut; \
this should be impossible",
);
Expand Down
26 changes: 13 additions & 13 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use crate::{
io::ipc::fb_to_schema,
io::ipc::read::read_record_batch,
io::ipc::write,
io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions},
io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions},
record_batch::RecordBatch,
};

/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
pub fn serialize_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
options: &WriteOptions,
) -> (Vec<FlightData>, FlightData) {
let mut dictionary_tracker = DictionaryTracker::new(false);

Expand All @@ -44,38 +44,38 @@ impl From<EncodedData> for FlightData {
}

/// Serializes a [`Schema`] to [`SchemaResult`].
pub fn serialize_schema_to_result(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult {
pub fn serialize_schema_to_result(schema: &Schema) -> SchemaResult {
SchemaResult {
schema: schema_as_flatbuffer(schema, options),
schema: schema_as_flatbuffer(schema),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData {
let data_header = schema_as_flatbuffer(schema, options);
pub fn serialize_schema(schema: &Schema) -> FlightData {
let data_header = schema_as_flatbuffer(schema);
FlightData {
data_header,
..Default::default()
}
}

/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`].
pub fn serialize_schema_to_info(schema: &Schema, options: &IpcWriteOptions) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema, options);
pub fn serialize_schema_to_info(schema: &Schema) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema);

let mut schema = vec![];
write::common::write_message(&mut schema, encoded_data, options)?;
write::common::write_message(&mut schema, encoded_data)?;
Ok(schema)
}

fn schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> Vec<u8> {
let encoded_data = schema_as_encoded_data(schema, options);
fn schema_as_flatbuffer(schema: &Schema) -> Vec<u8> {
let encoded_data = schema_as_encoded_data(schema);
encoded_data.ipc_message
}

fn schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
fn schema_as_encoded_data(schema: &Schema) -> EncodedData {
EncodedData {
ipc_message: write::schema_to_bytes(arrow_schema, *options.metadata_version()),
ipc_message: write::schema_to_bytes(schema),
arrow_data: vec![],
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//! # Examples
//! Read and write to a file:
//! ```
//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}};
//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::{FileWriter, WriteOptions}}};
//! # use std::fs::File;
//! # use std::sync::Arc;
//! # use arrow2::datatypes::{Field, Schema, DataType};
Expand All @@ -43,7 +43,8 @@
//! let x_coord = Field::new("x", DataType::Int32, false);
//! let y_coord = Field::new("y", DataType::Int32, false);
//! let schema = Schema::new(vec![x_coord, y_coord]);
//! let mut writer = FileWriter::try_new(file, &schema)?;
//! let options = WriteOptions {compression: None};
//! let mut writer = FileWriter::try_new(file, &schema, options)?;
//!
//! // Setup the data
//! let x_data = Int32Array::from_slice([-1i32, 1]);
Expand Down
Loading

0 comments on commit 83d828c

Please sign in to comment.