Skip to content

Commit

Permalink
fix: ensure the v2 file writer does not mix up the order of a column (#…
Browse files Browse the repository at this point in the history
…2836)

Sometimes the v2 writer will split a batch into multiple pages. When it
does this it encodes those pages in parallel. It is possible those
encoding tasks finish out of order and the writer was then writing the
pages out of order. This mean the order of one column could get out of
sync with the order of another column.

---------

Co-authored-by: broccoliSpicy <93440049+broccoliSpicy@users.noreply.github.com>
  • Loading branch information
westonpace and broccoliSpicy authored Sep 6, 2024
1 parent f30a679 commit ef0953d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
15 changes: 15 additions & 0 deletions python/python/tests/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,18 @@ def test_write_read_additional_schema_metadata(tmp_path):
reader.metadata().schema.metadata.get(schema_metadata_key.encode()).decode()
== schema_metadata_value
)


def test_writer_maintains_order(tmp_path):
# 100Ki strings, each string is a couple of KiBs
big_strings = [f"{i}" * 1024 for i in range(100 * 1024)]
table = pa.table({"big_strings": big_strings})

for i in range(4):
path = tmp_path / f"foo-{i}.lance"
with LanceFileWriter(str(path)) as writer:
writer.write_batch(table)

reader = LanceFileReader(str(path))
result = reader.read_all().to_table()
assert result == table
4 changes: 4 additions & 0 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ pub trait FieldEncoder: Send {
fn maybe_encode(&mut self, array: ArrayRef) -> Result<Vec<EncodeTask>>;
/// Flush any remaining data from the buffers into encoding tasks
///
/// Each encode task produces a single page. The order of these pages will be maintained
/// in the file (we do not worry about order between columns but all pages in the same
/// column should maintain order)
///
/// This may be called intermittently throughout encoding but will always be called
/// once at the end of encoding just before calling finish
fn flush(&mut self) -> Result<Vec<EncodeTask>>;
Expand Down
11 changes: 4 additions & 7 deletions rust/lance-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;
use arrow_array::RecordBatch;

use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::FuturesUnordered;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use lance_core::datatypes::Schema as LanceSchema;
use lance_core::{Error, Result};
Expand Down Expand Up @@ -184,10 +184,7 @@ impl FileWriter {
}

#[instrument(skip_all, level = "debug")]
async fn write_pages(
&mut self,
mut encoding_tasks: FuturesUnordered<EncodeTask>,
) -> Result<()> {
async fn write_pages(&mut self, mut encoding_tasks: FuturesOrdered<EncodeTask>) -> Result<()> {
// As soon as an encoding task is done we write it. There is no parallelism
// needed here because "writing" is really just submitting the buffer to the
// underlying write scheduler (either the OS or object_store's scheduler for
Expand Down Expand Up @@ -328,7 +325,7 @@ impl FileWriter {
let encoding_tasks = encoding_tasks
.into_iter()
.flatten()
.collect::<FuturesUnordered<_>>();
.collect::<FuturesOrdered<_>>();

self.write_pages(encoding_tasks).await?;

Expand Down Expand Up @@ -484,7 +481,7 @@ impl FileWriter {
let encoding_tasks = encoding_tasks
.into_iter()
.flatten()
.collect::<FuturesUnordered<_>>();
.collect::<FuturesOrdered<_>>();
self.write_pages(encoding_tasks).await?;

self.finish_writers().await?;
Expand Down

0 comments on commit ef0953d

Please sign in to comment.