From a347a5f1873258a6474895b7aaab2cd46986dc1b Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Mon, 25 Nov 2024 15:36:38 +0100 Subject: [PATCH 1/2] arrow-ipc: Default to not preserving dict IDs --- arrow-ipc/src/writer.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 23cefede7b37..67b54e30ba32 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -63,7 +63,7 @@ pub struct IpcWriteOptions { /// Flag indicating whether the writer should preserve the dictionary IDs defined in the /// schema or generate unique dictionary IDs internally during encoding. /// - /// Defaults to `true` + /// Defaults to `false` preserve_dict_id: bool, } @@ -112,7 +112,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, }), crate::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -125,7 +125,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, }) } } @@ -161,7 +161,7 @@ impl Default for IpcWriteOptions { write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, } } } @@ -785,7 +785,7 @@ impl DictionaryTracker { written: HashMap::new(), dict_ids: Vec::new(), error_on_replacement, - preserve_dict_id: true, + preserve_dict_id: false, } } From 206f7f4624f222e81b675011e6efe36534fae46a Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Tue, 26 Nov 2024 17:08:31 +0100 Subject: [PATCH 2/2] arrow-integration-testing: Adapt to using default settings Previously the integration tests forced preserving dict IDs in some places and used the default in others. This worked fine previously because preserving dict IDs used to be the default, but it isn't anymore. --- .../integration_test.rs | 38 ++++++++++++++++--- .../integration_test.rs | 23 ++++++++--- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index c8289ff446a0..62cdca975b0e 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -29,7 +29,7 @@ use arrow::{ }; use arrow_flight::{ flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, SchemaAsIpc, Ticket, + utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, IpcMessage, Location, Ticket, }; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -72,7 +72,19 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = arrow::ipc::writer::IpcWriteOptions::default(); - let mut schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into(); + let mut dict_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let data_gen = writer::IpcDataGenerator::default(); + let data = IpcMessage( + data_gen + .schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options) + .ipc_message + .into(), + ); + let mut schema_flight_data = FlightData { + data_header: data.0, + ..Default::default() + }; // arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); schema_flight_data.flight_descriptor = Some(descriptor.clone()); upload_tx.send(schema_flight_data).await?; @@ -82,7 +94,14 @@ async fn upload_data( if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); // Preload the first batch into the channel before starting the request - send_batch(&mut upload_tx, &metadata, first_batch, &options).await?; + send_batch( + &mut upload_tx, + &metadata, + first_batch, + &options, + &mut dict_tracker, + ) + .await?; let outer = client.do_put(Request::new(upload_rx)).await?; let mut inner = outer.into_inner(); @@ -97,7 +116,14 @@ async fn upload_data( // Stream the rest of the batches for (counter, batch) in original_data_iter { let metadata = counter.to_string().into_bytes(); - send_batch(&mut upload_tx, &metadata, batch, &options).await?; + send_batch( + &mut upload_tx, + &metadata, + batch, + &options, + &mut dict_tracker, + ) + .await?; let r = inner .next() @@ -124,12 +150,12 @@ async fn send_batch( metadata: &[u8], batch: &RecordBatch, options: &writer::IpcWriteOptions, + dictionary_tracker: &mut writer::DictionaryTracker, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); - let mut dictionary_tracker = writer::DictionaryTracker::new_with_preserve_dict_id(false, true); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, options) + .encoded_batch(batch, dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data: Vec = diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 0f404b2ae289..277217500813 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -119,18 +119,31 @@ impl FlightService for FlightServiceImpl { .ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?; let options = arrow::ipc::writer::IpcWriteOptions::default(); + let mut dictionary_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let data_gen = writer::IpcDataGenerator::default(); + let data = IpcMessage( + data_gen + .schema_to_bytes_with_dictionary_tracker( + &flight.schema, + &mut dictionary_tracker, + &options, + ) + .ipc_message + .into(), + ); + let schema_flight_data = FlightData { + data_header: data.0, + ..Default::default() + }; - let schema = std::iter::once(Ok(SchemaAsIpc::new(&flight.schema, &options).into())); + let schema = std::iter::once(Ok(schema_flight_data)); let batches = flight .chunks .iter() .enumerate() .flat_map(|(counter, batch)| { - let data_gen = writer::IpcDataGenerator::default(); - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, true); - let (encoded_dictionaries, encoded_batch) = data_gen .encoded_batch(batch, &mut dictionary_tracker, &options) .expect("DictionaryTracker configured above to not error on replacement");