diff --git a/python/python/tests/conftest.py b/python/python/tests/conftest.py index f8b8e8622b..1c1b28aced 100644 --- a/python/python/tests/conftest.py +++ b/python/python/tests/conftest.py @@ -30,11 +30,19 @@ def pytest_addoption(parser): default=False, help="Run integration tests (requires S3 buckets to be setup with access)", ) + parser.addoption( + "--run-slow", + action="store_true", + default=False, + help="Run slow tests", + ) def pytest_collection_modifyitems(config, items): if not config.getoption("--run-integration"): disable_items_with_mark(items, "integration", "--run-integration not specified") + if not config.getoption("--run-slow"): + disable_items_with_mark(items, "slow", "--run-slow not specified") try: import torch diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index a6527e900b..ec44bf8a58 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1650,11 +1650,16 @@ def test_scan_with_batch_size(tmp_path: Path): assert batch.num_rows != 12 +@pytest.mark.slow def test_io_buffer_size(tmp_path: Path): - # This regresses a deadlock issue that was happening when the - # batch size was very large (in bytes) so that batches would be + # These cases regress deadlock issues that happen when the + # batch size was very large (in bytes) so that batches are # bigger than the I/O buffer size # + # The test is slow (it needs to generate enough data to cover a variety + # of cases) but it is essential to run if any changes are made to the + # 2.0 scheduling priority / decoding strategy + # # In this test we create 4 pages of data, 2 for each column. We # then set the I/O buffer size to 5000 bytes so that we only read # 1 page at a time and set the batch size to 2M rows so that we @@ -1808,6 +1813,45 @@ def datagen(): dataset.scanner(batch_size=2 * 1024 * 1024, io_buffer_size=5000).to_table() + # This reproduces another issue we saw where there are a bunch of empty lists and + # lance was calculating the page priority incorrectly. + + fsl_type = pa.list_(pa.uint64(), 32 * 1024 * 1024) + list_type = pa.list_(fsl_type) + + def datagen(): + # Each item is 32 + values = pa.array(range(32 * 1024 * 1024 * 7), pa.uint64()) + fsls = pa.FixedSizeListArray.from_arrays(values, 32 * 1024 * 1024) + # 3 items, 5 empties, 2 items + offsets = pa.array([0, 1, 2, 3, 4, 4, 5, 6, 7], pa.int32()) + lists = pa.ListArray.from_arrays(offsets, fsls) + + values2 = pa.array(range(32 * 1024 * 1024 * 8), pa.uint64()) + fsls2 = pa.FixedSizeListArray.from_arrays(values2, 32 * 1024 * 1024) + offsets2 = pa.array([0, 1, 2, 3, 4, 5, 6, 7, 8], pa.int32()) + lists2 = pa.ListArray.from_arrays(offsets2, fsls2) + + yield pa.record_batch( + [ + lists, + lists2, + ], + names=["a", "b"], + ) + + schema = pa.schema({"a": list_type, "b": list_type}) + + dataset = lance.write_dataset( + datagen(), + base_dir, + schema=schema, + data_storage_version="stable", + mode="overwrite", + ) + + dataset.scanner(batch_size=10, io_buffer_size=5000).to_table() + def test_scan_no_columns(tmp_path: Path): base_dir = tmp_path / "dataset" diff --git a/rust/lance-encoding/src/encodings/logical/list.rs b/rust/lance-encoding/src/encodings/logical/list.rs index 39d261be1b..9aa44c7259 100644 --- a/rust/lance-encoding/src/encodings/logical/list.rs +++ b/rust/lance-encoding/src/encodings/logical/list.rs @@ -692,11 +692,10 @@ impl LogicalPageDecoder for ListPageDecoder { return Ok(()); } - let mut boundary = loaded_need as usize; + let boundary = loaded_need as usize; debug_assert!(boundary < self.num_rows as usize); - while boundary + 2 < self.offsets.len() && self.offsets[boundary] == self.offsets[boundary + 1] { - boundary += 1; - } + // We need more than X lists which means we need at least X+1 lists which means + // we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items. let items_needed = self.offsets[boundary + 1].saturating_sub(1); trace!( "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded. To satisfy this we need more than {} loaded items", diff --git a/rust/lance-encoding/src/encodings/logical/primitive.rs b/rust/lance-encoding/src/encodings/logical/primitive.rs index 09974b72f1..673aac9762 100644 --- a/rust/lance-encoding/src/encodings/logical/primitive.rs +++ b/rust/lance-encoding/src/encodings/logical/primitive.rs @@ -31,6 +31,7 @@ use crate::{ struct PrimitivePage { scheduler: Box, num_rows: u64, + page_index: u32, } /// A field scheduler for primitive fields @@ -61,9 +62,13 @@ impl PrimitiveFieldScheduler { ) -> Self { let page_schedulers = pages .iter() + .enumerate() // Buggy versions of Lance could sometimes create empty pages - .filter(|page| page.num_rows > 0) - .map(|page| { + .filter(|(page_index, page)| { + log::trace!("Skipping empty page with index {}", page_index); + page.num_rows > 0 + }) + .map(|(page_index, page)| { let page_buffers = PageBuffers { column_buffers: buffers, positions_and_sizes: &page.buffer_offsets_and_sizes, @@ -73,6 +78,7 @@ impl PrimitiveFieldScheduler { PrimitivePage { scheduler, num_rows: page.num_rows, + page_index: page_index as u32, } }) .collect::>(); @@ -160,10 +166,13 @@ impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> { let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum(); trace!( - "Scheduling {} rows across {} ranges from page with {} rows", + "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})", num_rows_in_next, ranges_in_page.len(), - cur_page.num_rows + cur_page.num_rows, + priority.current_priority(), + self.scheduler.column_index, + cur_page.page_index, ); self.global_row_offset += cur_page.num_rows; @@ -183,6 +192,7 @@ impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> { rows_drained: 0, num_rows: num_rows_in_next, should_validate: self.scheduler.should_validate, + page_index: cur_page.page_index, }; let decoder = Box::new(logical_decoder); @@ -224,6 +234,7 @@ pub struct PrimitiveFieldDecoder { num_rows: u64, rows_drained: u64, column_index: u32, + page_index: u32, } impl PrimitiveFieldDecoder { @@ -241,6 +252,7 @@ impl PrimitiveFieldDecoder { num_rows, rows_drained: 0, column_index: u32::MAX, + page_index: u32::MAX, } } } @@ -301,9 +313,10 @@ impl LogicalPageDecoder for PrimitiveFieldDecoder { // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode" fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture> { log::trace!( - "primitive wait for more than {} rows on column {} (page has {} rows)", + "primitive wait for more than {} rows on column {} and page {} (page has {} rows)", loaded_need, self.column_index, + self.page_index, self.num_rows ); async move {