diff --git a/python/python/tests/test_file.py b/python/python/tests/test_file.py index 9673288a74..664092762e 100644 --- a/python/python/tests/test_file.py +++ b/python/python/tests/test_file.py @@ -320,3 +320,18 @@ def test_write_read_additional_schema_metadata(tmp_path): reader.metadata().schema.metadata.get(schema_metadata_key.encode()).decode() == schema_metadata_value ) + + +def test_writer_maintains_order(tmp_path): + # 100Ki strings, each string is a couple of KiBs + big_strings = [f"{i}" * 1024 for i in range(100 * 1024)] + table = pa.table({"big_strings": big_strings}) + + for i in range(4): + path = tmp_path / f"foo-{i}.lance" + with LanceFileWriter(str(path)) as writer: + writer.write_batch(table) + + reader = LanceFileReader(str(path)) + result = reader.read_all().to_table() + assert result == table diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 2c4794f331..df1fae611d 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -228,6 +228,10 @@ pub trait FieldEncoder: Send { fn maybe_encode(&mut self, array: ArrayRef) -> Result>; /// Flush any remaining data from the buffers into encoding tasks /// + /// Each encode task produces a single page. The order of these pages will be maintained + /// in the file (we do not worry about order between columns but all pages in the same + /// column should maintain order) + /// /// This may be called intermittently throughout encoding but will always be called /// once at the end of encoding just before calling finish fn flush(&mut self) -> Result>; diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 821ace79b4..121d1c7167 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use arrow_array::RecordBatch; use bytes::{BufMut, Bytes, BytesMut}; -use futures::stream::FuturesUnordered; +use futures::stream::FuturesOrdered; use futures::StreamExt; use lance_core::datatypes::Schema as LanceSchema; use lance_core::{Error, Result}; @@ -184,10 +184,7 @@ impl FileWriter { } #[instrument(skip_all, level = "debug")] - async fn write_pages( - &mut self, - mut encoding_tasks: FuturesUnordered, - ) -> Result<()> { + async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered) -> Result<()> { // As soon as an encoding task is done we write it. There is no parallelism // needed here because "writing" is really just submitting the buffer to the // underlying write scheduler (either the OS or object_store's scheduler for @@ -328,7 +325,7 @@ impl FileWriter { let encoding_tasks = encoding_tasks .into_iter() .flatten() - .collect::>(); + .collect::>(); self.write_pages(encoding_tasks).await?; @@ -484,7 +481,7 @@ impl FileWriter { let encoding_tasks = encoding_tasks .into_iter() .flatten() - .collect::>(); + .collect::>(); self.write_pages(encoding_tasks).await?; self.finish_writers().await?;