Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in serializing batch to flight (#1093)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 23, 2022
1 parent 5569595 commit 2f1a223
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async fn send_batch(
fields: &[IpcField],
options: &write::WriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, fields, options);
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, fields, options)?;

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl FlightService for FlightServiceImpl {
.enumerate()
.flat_map(|(counter, batch)| {
let (dictionary_flight_data, mut batch_flight_data) =
serialize_batch(batch, &flight.ipc_schema.fields, &options);
serialize_batch(batch, &flight.ipc_schema.fields, &options).unwrap();

// Only the record batch's FlightData gets app_metadata
let metadata = counter.to_string().into_bytes();
Expand Down
12 changes: 9 additions & 3 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@ use crate::{
io::ipc::write::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions},
};

use super::ipc::write::default_ipc_fields;
pub use super::ipc::write::default_ipc_fields;
use super::ipc::{IpcField, IpcSchema};

/// Serializes [`Chunk`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
/// # Errors
/// This function errors iff `fields` is not consistent with `columns`
pub fn serialize_batch(
columns: &Chunk<Box<dyn Array>>,
fields: &[IpcField],
options: &WriteOptions,
) -> (Vec<FlightData>, FlightData) {
) -> Result<(Vec<FlightData>, FlightData)> {
if fields.len() != columns.arrays().len() {
return Err(Error::InvalidArgumentError("The argument `fields` must be consistent with the columns' schema. Use e.g. &arrow2::io::flight::default_ipc_fields(&schema.fields)".to_string()));
}

let mut dictionary_tracker = DictionaryTracker {
dictionaries: Default::default(),
cannot_replace: false,
Expand All @@ -36,7 +42,7 @@ pub fn serialize_batch(
let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
let flight_batch = encoded_batch.into();

(flight_dictionaries, flight_batch)
Ok((flight_dictionaries, flight_batch))
}

impl From<EncodedData> for FlightData {
Expand Down

0 comments on commit 2f1a223

Please sign in to comment.