Skip to content

Commit

Permalink
fix: calculate decode priority correctly (#2841)
Browse files Browse the repository at this point in the history
When decoding we have to "wait until more than X rows are ready"

There was an issue with how we handled empty lists. For example, if the
list offsets are:

```
# List sizes = [10, 20, 30, 0, 40]
0 10 30 60 60 100
```

The question is what do we do when we need to wait until more than 3
rows are ready. 3 rows means we have 60 items. The old logic looked for
the next offset greater than 60 (i.e. 100) and waited until we had 100
items. This mirrors some logic we do on the reverse side when we
calculate how many rows are available based on how many items we have.

However, that is wrong. If we have 60 items we actually have 4 rows
available and so we have more than 3 rows available. The logic is much
simpler. Just wait until we have at least `offsets[desired_rows + 1]`
items. If that value is equal to `offsets[desired_rows]` it doesn't
matter, we don't need to do any special logic.

If we wait for too many rows it can lead to deadlock in certain
situations.
  • Loading branch information
westonpace authored Sep 9, 2024
1 parent 6016917 commit c8fe1dd
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 11 deletions.
8 changes: 8 additions & 0 deletions python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ def pytest_addoption(parser):
default=False,
help="Run integration tests (requires S3 buckets to be setup with access)",
)
parser.addoption(
"--run-slow",
action="store_true",
default=False,
help="Run slow tests",
)


def pytest_collection_modifyitems(config, items):
if not config.getoption("--run-integration"):
disable_items_with_mark(items, "integration", "--run-integration not specified")
if not config.getoption("--run-slow"):
disable_items_with_mark(items, "slow", "--run-slow not specified")
try:
import torch

Expand Down
48 changes: 46 additions & 2 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1650,11 +1650,16 @@ def test_scan_with_batch_size(tmp_path: Path):
assert batch.num_rows != 12


@pytest.mark.slow
def test_io_buffer_size(tmp_path: Path):
# This regresses a deadlock issue that was happening when the
# batch size was very large (in bytes) so that batches would be
# These cases regress deadlock issues that happen when the
# batch size was very large (in bytes) so that batches are
# bigger than the I/O buffer size
#
# The test is slow (it needs to generate enough data to cover a variety
# of cases) but it is essential to run if any changes are made to the
# 2.0 scheduling priority / decoding strategy
#
# In this test we create 4 pages of data, 2 for each column. We
# then set the I/O buffer size to 5000 bytes so that we only read
# 1 page at a time and set the batch size to 2M rows so that we
Expand Down Expand Up @@ -1808,6 +1813,45 @@ def datagen():

dataset.scanner(batch_size=2 * 1024 * 1024, io_buffer_size=5000).to_table()

# This reproduces another issue we saw where there are a bunch of empty lists and
# lance was calculating the page priority incorrectly.

fsl_type = pa.list_(pa.uint64(), 32 * 1024 * 1024)
list_type = pa.list_(fsl_type)

def datagen():
# Each item is 32
values = pa.array(range(32 * 1024 * 1024 * 7), pa.uint64())
fsls = pa.FixedSizeListArray.from_arrays(values, 32 * 1024 * 1024)
# 3 items, 5 empties, 2 items
offsets = pa.array([0, 1, 2, 3, 4, 4, 5, 6, 7], pa.int32())
lists = pa.ListArray.from_arrays(offsets, fsls)

values2 = pa.array(range(32 * 1024 * 1024 * 8), pa.uint64())
fsls2 = pa.FixedSizeListArray.from_arrays(values2, 32 * 1024 * 1024)
offsets2 = pa.array([0, 1, 2, 3, 4, 5, 6, 7, 8], pa.int32())
lists2 = pa.ListArray.from_arrays(offsets2, fsls2)

yield pa.record_batch(
[
lists,
lists2,
],
names=["a", "b"],
)

schema = pa.schema({"a": list_type, "b": list_type})

dataset = lance.write_dataset(
datagen(),
base_dir,
schema=schema,
data_storage_version="stable",
mode="overwrite",
)

dataset.scanner(batch_size=10, io_buffer_size=5000).to_table()


def test_scan_no_columns(tmp_path: Path):
base_dir = tmp_path / "dataset"
Expand Down
7 changes: 3 additions & 4 deletions rust/lance-encoding/src/encodings/logical/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,11 +692,10 @@ impl LogicalPageDecoder for ListPageDecoder {
return Ok(());
}

let mut boundary = loaded_need as usize;
let boundary = loaded_need as usize;
debug_assert!(boundary < self.num_rows as usize);
while boundary + 2 < self.offsets.len() && self.offsets[boundary] == self.offsets[boundary + 1] {
boundary += 1;
}
// We need more than X lists which means we need at least X+1 lists which means
// we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items.
let items_needed = self.offsets[boundary + 1].saturating_sub(1);
trace!(
"List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items",
Expand Down
23 changes: 18 additions & 5 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
struct PrimitivePage {
scheduler: Box<dyn PageScheduler>,
num_rows: u64,
page_index: u32,
}

/// A field scheduler for primitive fields
Expand Down Expand Up @@ -61,9 +62,13 @@ impl PrimitiveFieldScheduler {
) -> Self {
let page_schedulers = pages
.iter()
.enumerate()
// Buggy versions of Lance could sometimes create empty pages
.filter(|page| page.num_rows > 0)
.map(|page| {
.filter(|(page_index, page)| {
log::trace!("Skipping empty page with index {}", page_index);
page.num_rows > 0
})
.map(|(page_index, page)| {
let page_buffers = PageBuffers {
column_buffers: buffers,
positions_and_sizes: &page.buffer_offsets_and_sizes,
Expand All @@ -73,6 +78,7 @@ impl PrimitiveFieldScheduler {
PrimitivePage {
scheduler,
num_rows: page.num_rows,
page_index: page_index as u32,
}
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -160,10 +166,13 @@ impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> {

let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
trace!(
"Scheduling {} rows across {} ranges from page with {} rows",
"Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
num_rows_in_next,
ranges_in_page.len(),
cur_page.num_rows
cur_page.num_rows,
priority.current_priority(),
self.scheduler.column_index,
cur_page.page_index,
);

self.global_row_offset += cur_page.num_rows;
Expand All @@ -183,6 +192,7 @@ impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> {
rows_drained: 0,
num_rows: num_rows_in_next,
should_validate: self.scheduler.should_validate,
page_index: cur_page.page_index,
};

let decoder = Box::new(logical_decoder);
Expand Down Expand Up @@ -224,6 +234,7 @@ pub struct PrimitiveFieldDecoder {
num_rows: u64,
rows_drained: u64,
column_index: u32,
page_index: u32,
}

impl PrimitiveFieldDecoder {
Expand All @@ -241,6 +252,7 @@ impl PrimitiveFieldDecoder {
num_rows,
rows_drained: 0,
column_index: u32::MAX,
page_index: u32::MAX,
}
}
}
Expand Down Expand Up @@ -301,9 +313,10 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder {
// breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
log::trace!(
"primitive wait for more than {} rows on column {} (page has {} rows)",
"primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
loaded_need,
self.column_index,
self.page_index,
self.num_rows
);
async move {
Expand Down

0 comments on commit c8fe1dd

Please sign in to comment.