Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

arrow-ipc: Default to not preserving dict IDs #6788

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow::{
};
use arrow_flight::{
flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient,
utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, SchemaAsIpc, Ticket,
utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, IpcMessage, Location, Ticket,
};
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};
Expand Down Expand Up @@ -72,7 +72,19 @@ async fn upload_data(
let (mut upload_tx, upload_rx) = mpsc::channel(10);

let options = arrow::ipc::writer::IpcWriteOptions::default();
let mut schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into();
let mut dict_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let data_gen = writer::IpcDataGenerator::default();
let data = IpcMessage(
data_gen
.schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options)
.ipc_message
.into(),
);
let mut schema_flight_data = FlightData {
data_header: data.0,
..Default::default()
};
// arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options);
schema_flight_data.flight_descriptor = Some(descriptor.clone());
upload_tx.send(schema_flight_data).await?;
Expand All @@ -82,7 +94,14 @@ async fn upload_data(
if let Some((counter, first_batch)) = original_data_iter.next() {
let metadata = counter.to_string().into_bytes();
// Preload the first batch into the channel before starting the request
send_batch(&mut upload_tx, &metadata, first_batch, &options).await?;
send_batch(
&mut upload_tx,
&metadata,
first_batch,
&options,
&mut dict_tracker,
)
.await?;

let outer = client.do_put(Request::new(upload_rx)).await?;
let mut inner = outer.into_inner();
Expand All @@ -97,7 +116,14 @@ async fn upload_data(
// Stream the rest of the batches
for (counter, batch) in original_data_iter {
let metadata = counter.to_string().into_bytes();
send_batch(&mut upload_tx, &metadata, batch, &options).await?;
send_batch(
&mut upload_tx,
&metadata,
batch,
&options,
&mut dict_tracker,
)
.await?;

let r = inner
.next()
Expand All @@ -124,12 +150,12 @@ async fn send_batch(
metadata: &[u8],
batch: &RecordBatch,
options: &writer::IpcWriteOptions,
dictionary_tracker: &mut writer::DictionaryTracker,
) -> Result {
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new_with_preserve_dict_id(false, true);

let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, options)
.encoded_batch(batch, dictionary_tracker, options)
.expect("DictionaryTracker configured above to not error on replacement");

let dictionary_flight_data: Vec<FlightData> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,31 @@ impl FlightService for FlightServiceImpl {
.ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?;

let options = arrow::ipc::writer::IpcWriteOptions::default();
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let data_gen = writer::IpcDataGenerator::default();
let data = IpcMessage(
data_gen
.schema_to_bytes_with_dictionary_tracker(
&flight.schema,
&mut dictionary_tracker,
&options,
)
.ipc_message
.into(),
);
let schema_flight_data = FlightData {
data_header: data.0,
..Default::default()
};

let schema = std::iter::once(Ok(SchemaAsIpc::new(&flight.schema, &options).into()));
let schema = std::iter::once(Ok(schema_flight_data));

let batches = flight
.chunks
.iter()
.enumerate()
.flat_map(|(counter, batch)| {
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, true);

let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &options)
.expect("DictionaryTracker configured above to not error on replacement");
Expand Down
10 changes: 5 additions & 5 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct IpcWriteOptions {
/// Flag indicating whether the writer should preserve the dictionary IDs defined in the
/// schema or generate unique dictionary IDs internally during encoding.
///
/// Defaults to `true`
/// Defaults to `false`
preserve_dict_id: bool,
}

Expand Down Expand Up @@ -112,7 +112,7 @@ impl IpcWriteOptions {
write_legacy_ipc_format,
metadata_version,
batch_compression_type: None,
preserve_dict_id: true,
preserve_dict_id: false,
}),
crate::MetadataVersion::V5 => {
if write_legacy_ipc_format {
Expand All @@ -125,7 +125,7 @@ impl IpcWriteOptions {
write_legacy_ipc_format,
metadata_version,
batch_compression_type: None,
preserve_dict_id: true,
preserve_dict_id: false,
})
}
}
Expand Down Expand Up @@ -161,7 +161,7 @@ impl Default for IpcWriteOptions {
write_legacy_ipc_format: false,
metadata_version: crate::MetadataVersion::V5,
batch_compression_type: None,
preserve_dict_id: true,
preserve_dict_id: false,
}
}
}
Expand Down Expand Up @@ -785,7 +785,7 @@ impl DictionaryTracker {
written: HashMap::new(),
dict_ids: Vec::new(),
error_on_replacement,
preserve_dict_id: true,
preserve_dict_id: false,
}
}

Expand Down
Loading