From ab9bc2e6c810ea390d2dcf182b0131396d09039a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 6 Sep 2024 09:26:13 -0700 Subject: [PATCH] Fix bug in nested projection, rebase, properly report progress when creating fragment --- python/python/tests/test_dataset.py | 29 ++++++- python/python/tests/test_fragment.py | 29 ++++--- rust/lance-encoding/src/decoder.rs | 25 ++++-- rust/lance-encoding/src/testing.rs | 47 +++++++++-- rust/lance-file/src/format.rs | 1 - rust/lance-file/src/v2/reader.rs | 88 ++++++++++++++------ rust/lance-file/src/v2/writer.rs | 14 ++-- rust/lance-table/src/format/fragment.rs | 28 +++++-- rust/lance/src/dataset/fragment.rs | 50 ++++++----- rust/lance/src/dataset/fragment/write.rs | 16 ++-- rust/lance/src/dataset/scanner.rs | 8 +- rust/lance/src/dataset/write/merge_insert.rs | 7 +- 12 files changed, 231 insertions(+), 111 deletions(-) diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9fd84e50ab..27f53b0c9d 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -539,6 +539,31 @@ def test_pickle(tmp_path: Path): assert dataset.to_table() == unpickled.to_table() +def test_nested_projection(tmp_path: Path): + from lance.debug import format_fragment + + table = pa.Table.from_pydict( + { + "a": range(100), + "b": range(100), + "struct": [{"x": counter, "y": counter % 2 == 0} for counter in range(100)], + } + ) + base_dir = tmp_path / "test" + lance.write_dataset(table, base_dir) + + dataset = lance.dataset(base_dir) + + print(format_fragment(dataset.get_fragment(0).metadata, dataset)) + projected = dataset.to_table(columns=["struct.x"]) + assert projected == pa.Table.from_pydict({"struct.x": range(100)}) + + projected = dataset.to_table(columns=["struct.y"]) + assert projected == pa.Table.from_pydict( + {"struct.y": [i % 2 == 0 for i in range(100)]} + ) + + def test_polar_scan(tmp_path: Path): some_structs = [{"x": counter, "y": counter} for counter in range(100)] table = pa.Table.from_pydict( @@ -2233,8 +2258,8 @@ def test_late_materialization_batch_size(tmp_path: Path): EXPECTED_DEFAULT_STORAGE_VERSION = "2.0" -EXPECTED_MAJOR_VERSION = 0 -EXPECTED_MINOR_VERSION = 3 +EXPECTED_MAJOR_VERSION = 2 +EXPECTED_MINOR_VERSION = 0 def test_default_storage_version(tmp_path: Path): diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index 216a9dc8c8..c14cba5a73 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -63,11 +63,11 @@ def test_write_fragment_two_phases(tmp_path: Path): def test_write_legacy_fragment(tmp_path: Path): tab = pa.table({"a": range(1024)}) frag = LanceFragment.create(tmp_path, tab, data_storage_version="legacy") - assert "file_minor_version: 3" not in str(frag) + assert "file_major_version: 2" not in str(frag) tab = pa.table({"a": range(1024)}) frag = LanceFragment.create(tmp_path, tab, data_storage_version="stable") - assert "file_minor_version: 3" in str(frag) + assert "file_major_version: 2" in str(frag) def test_scan_fragment(tmp_path: Path): @@ -99,15 +99,18 @@ def test_scan_fragment_with_dynamic_projection(tmp_path: Path): def test_write_fragments(tmp_path: Path): - # This will be split across two files if we set the max_bytes_per_file to 1024 - tab = pa.table( - { - "a": pa.array(range(1024)), - } + # Should result in two files since each batch is 8MB and max_bytes_per_file is small + batches = pa.RecordBatchReader.from_batches( + pa.schema([pa.field("a", pa.string())]), + [ + pa.record_batch([pa.array(["0" * 1024] * 1024 * 8)], names=["a"]), + pa.record_batch([pa.array(["0" * 1024] * 1024 * 8)], names=["a"]), + ], ) + progress = ProgressForTest() fragments = write_fragments( - tab, + batches, tmp_path, max_rows_per_group=512, max_bytes_per_file=1024, @@ -200,11 +203,11 @@ def test_dataset_progress(tmp_path: Path): assert metadata["id"] == 0 assert len(metadata["files"]) == 1 # Fragments aren't exactly equal, because the file was written before - # physical_rows was known. - assert ( - fragment.data_files() - == FragmentMetadata.from_json(json.dumps(metadata)).data_files() - ) + # physical_rows was known. However, the paths should be the same. + assert len(fragment.data_files()) == 1 + deserialized = FragmentMetadata.from_json(json.dumps(metadata)) + assert len(deserialized.data_files()) == 1 + assert fragment.data_files()[0].path() == deserialized.data_files()[0].path() ctx = multiprocessing.get_context("spawn") p = ctx.Process(target=failing_write, args=(progress_uri, dataset_uri)) diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 4f5bc4b4dd..97d8ccee7d 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -488,6 +488,15 @@ impl<'a> ColumnInfoIter<'a> { &self.column_infos[self.column_info_pos] } + pub fn expect_next(&mut self) -> Result<&'a ColumnInfo> { + self.next().ok_or_else(|| { + Error::invalid_input( + "there were more fields in the schema than provided column indices", + location!(), + ) + }) + } + pub(crate) fn next_top_level(&mut self) { self.column_indices_pos += 1; if self.column_indices_pos < self.column_indices.len() { @@ -678,7 +687,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { ) -> Result> { let data_type = field.data_type(); if Self::is_primitive(&data_type) { - let primitive_col = column_infos.next().unwrap(); + let primitive_col = column_infos.expect_next()?; let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), @@ -692,7 +701,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { // A fixed size list column could either be a physical or a logical decoder // depending on the child data type. if Self::is_primitive(inner.data_type()) { - let primitive_col = column_infos.next().unwrap(); + let primitive_col = column_infos.expect_next()?; let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), @@ -706,7 +715,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { } DataType::Dictionary(_key_type, value_type) => { if Self::is_primitive(value_type) { - let primitive_col = column_infos.next().unwrap(); + let primitive_col = column_infos.expect_next()?; let scheduler = self.create_primitive_scheduler( &data_type, chain.current_path(), @@ -726,7 +735,8 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { } } DataType::List(items_field) | DataType::LargeList(items_field) => { - let offsets_column = column_infos.next().unwrap(); + let offsets_column = column_infos.expect_next()?; + column_infos.next_top_level(); Self::ensure_values_encoded(offsets_column, chain.current_path())?; let offsets_column_buffers = ColumnBuffers { file_buffers: buffers, @@ -794,7 +804,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { Ok((chain, list_scheduler)) } DataType::Struct(fields) => { - let column_info = column_infos.next().unwrap(); + let column_info = column_infos.expect_next()?; if Self::check_packed_struct(column_info) { // use packed struct encoding @@ -808,13 +818,10 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy { } else { // use default struct encoding Self::check_simple_struct(column_info, chain.current_path()).unwrap(); - let is_root = field.metadata.contains_key("__lance_decoder_root"); let mut child_schedulers = Vec::with_capacity(field.children.len()); let mut chain = chain; for (i, field) in field.children.iter().enumerate() { - if is_root { - column_infos.next_top_level(); - } + column_infos.next_top_level(); let (next_chain, field_scheduler) = chain.new_child(i as u32, field, column_infos, buffers)?; child_schedulers.push(field_scheduler?); diff --git a/rust/lance-encoding/src/testing.rs b/rust/lance-encoding/src/testing.rs index 79feb80339..4c020105a9 100644 --- a/rust/lance-encoding/src/testing.rs +++ b/rust/lance-encoding/src/testing.rs @@ -5,7 +5,7 @@ use std::{cmp::Ordering, collections::HashMap, ops::Range, sync::Arc}; use arrow::array::make_comparator; use arrow_array::{Array, UInt64Array}; -use arrow_schema::{DataType, Field, Schema, SortOptions}; +use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions}; use arrow_select::concat::concat; use bytes::{Bytes, BytesMut}; use futures::{future::BoxFuture, FutureExt, StreamExt}; @@ -69,12 +69,49 @@ impl EncodingsIo for SimulatedScheduler { } } +fn column_indices_from_schema_helper( + fields: &[FieldRef], + column_indices: &mut Vec, + column_counter: &mut u32, +) { + column_indices.push(*column_counter); + *column_counter += 1; + for field in fields { + match field.data_type() { + DataType::Struct(fields) => { + column_indices_from_schema_helper(fields.as_ref(), column_indices, column_counter); + } + DataType::List(inner) => { + column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter); + } + DataType::LargeList(inner) => { + column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter); + } + DataType::FixedSizeList(inner, _) => { + // FSL(primitive) does not get its own column + column_indices.pop(); + *column_counter -= 1; + column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter); + } + _ => { + column_indices_from_schema_helper(&[], column_indices, column_counter); + } + } + } +} + +fn column_indices_from_schema(schema: &Schema) -> Vec { + let mut column_indices = Vec::new(); + let mut column_counter = 0; + column_indices_from_schema_helper(schema.fields(), &mut column_indices, &mut column_counter); + column_indices +} + #[allow(clippy::too_many_arguments)] async fn test_decode( num_rows: u64, batch_size: u32, schema: &Schema, - column_indices: &[u32], column_infos: &[Arc], expected: Option>, io: &Arc, @@ -88,9 +125,10 @@ async fn test_decode( DecoderMiddlewareChain::new().add_strategy(Arc::new(CoreFieldDecoderStrategy { validate_data: true, })); + let column_indices = column_indices_from_schema(schema); let decode_scheduler = DecodeBatchScheduler::try_new( &lance_schema, - column_indices, + &column_indices, column_infos, &Vec::new(), num_rows, @@ -415,7 +453,6 @@ async fn check_round_trip_encoding_inner( num_rows, test_cases.batch_size, &schema, - &[0], &column_infos, concat_data.clone(), &scheduler_copy.clone(), @@ -451,7 +488,6 @@ async fn check_round_trip_encoding_inner( num_rows, test_cases.batch_size, &schema, - &[0], &column_infos, expected, &scheduler.clone(), @@ -498,7 +534,6 @@ async fn check_round_trip_encoding_inner( num_rows, test_cases.batch_size, &schema, - &[0], &column_infos, expected, &scheduler.clone(), diff --git a/rust/lance-file/src/format.rs b/rust/lance-file/src/format.rs index 432338a0d2..5b8a714665 100644 --- a/rust/lance-file/src/format.rs +++ b/rust/lance-file/src/format.rs @@ -32,5 +32,4 @@ pub mod metadata; /// These version/magic values are written at the end of Lance files (e.g. versions/1.version) pub const MAJOR_VERSION: i16 = 0; pub const MINOR_VERSION: i16 = 2; -pub const MINOR_VERSION_NEXT: u16 = 3; pub const MAGIC: &[u8; 4] = b"LANC"; diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index 3184c69c66..a40f8b9450 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -116,13 +116,48 @@ impl CachedFileMetadata { /// representations of the same semantic type. An encoding could /// theoretically support "casting" (e.g. int to string, etc.) but /// there is little advantage in doing so here. +/// +/// Note: in order to specify a projection the user will need some way +/// to figure out the column indices. In the table format we do this +/// using field IDs and keeping track of the field id->column index mapping. +/// +/// If users are not using the table format then they will need to figure +/// out some way to do this themselves. #[derive(Debug, Clone)] pub struct ReaderProjection { /// The data types (schema) of the selected columns. The names /// of the schema are arbitrary and ignored. pub schema: Arc, - /// The indices of the columns to load. Note, these are the - /// indices of the top level fields only + /// The indices of the columns to load. + /// + /// The mapping should be as follows: + /// + /// - Primitive: the index of the column in the schema + /// - List: the index of the list column in the schema + /// followed by the column indices of the children + /// - FixedSizeList (of primitive): the index of the column in the schema + /// (this case is not nested) + /// - FixedSizeList (of non-primitive): not yet implemented + /// - Dictionary: same as primitive + /// - Struct: the index of the struct column in the schema + /// followed by the column indices of the children + /// + /// In other words, this should be a DFS listing of the desired schema. + /// + /// For example, if the goal is to load: + /// + /// x: int32 + /// y: struct + /// z: list + /// + /// and the schema originally used to store the data was: + /// + /// a: struct + /// b: int64 + /// y: struct + /// z: list + /// + /// Then the column_indices should be [1, 3, 4, 6, 7, 8] pub column_indices: Vec, } @@ -464,9 +499,6 @@ impl FileReader { location!(), )); } - if projection.schema.fields.len() != projection.column_indices.len() { - return Err(Error::invalid_input(format!("The projection schema has {} top level fields but only {} column indices were provided", projection.schema.fields.len(), projection.column_indices.len()), location!())); - } let mut column_indices_seen = BTreeSet::new(); for column_index in &projection.column_indices { if !column_indices_seen.insert(*column_index) { @@ -485,34 +517,38 @@ impl FileReader { Ok(()) } - // Helper function for `default_projection` to determine how many columns are occupied - // by a lance field. - fn default_column_count(field: &Field) -> u32 { - 1 + field - .children - .iter() - .map(Self::default_column_count) - .sum::() + fn default_projection_helper<'a>( + fields: impl Iterator, + column_indices: &mut Vec, + column_index_counter: &mut u32, + ) { + for field in fields { + column_indices.push(*column_index_counter); + *column_index_counter += 1; + + Self::default_projection_helper( + field.children.iter(), + column_indices, + column_index_counter, + ); + } } - // This function is one of the few spots in the reader where we rely on Lance table - // format and the fact that we wrote a Lance table schema into the global buffers. + // If we want to read the entire file, and we know the schema for the file, + // then we can use the default projection, and the user doesn't need to supply + // field IDs (and the field IDs in the schema do not need to be accurate) // - // TODO: In the future it would probably be better for the "default type" of a column - // to be something that can be provided dynamically via the encodings registry. We - // could pass the pages of the column to some logic that picks a data type based on the - // page encodings. - - /// Loads a default projection for all columns in the file, using the data type that - /// was provided when the file was written. + // If the user doesn't know the schema, then they can fetch it from the file's + // global buffers. So this method can be used in that case too. pub fn default_projection(lance_schema: &Schema) -> ReaderProjection { let schema = Arc::new(lance_schema.clone()); let mut column_indices = Vec::with_capacity(lance_schema.fields.len()); let mut column_index = 0; - for field in &lance_schema.fields { - column_indices.push(column_index); - column_index += Self::default_column_count(field); - } + Self::default_projection_helper( + schema.fields.iter(), + &mut column_indices, + &mut column_index, + ); ReaderProjection { schema, column_indices, diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 157b5f7301..0393039b65 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -31,8 +31,6 @@ use crate::format::pb; use crate::format::pbfile; use crate::format::pbfile::DirectEncoding; use crate::format::MAGIC; -use crate::format::MAJOR_VERSION; -use crate::format::MINOR_VERSION_NEXT; #[derive(Debug, Clone, Default)] pub struct FileWriterOptions { @@ -139,9 +137,7 @@ impl FileWriter { /// Returns the format version that will be used when writing the file pub fn version(&self) -> LanceFileVersion { - self.options - .format_version - .unwrap_or(LanceFileVersion::default_v2()) + self.options.format_version.unwrap_or_default() } async fn write_page(&mut self, encoded_page: EncodedPage) -> Result<()> { @@ -233,7 +229,7 @@ impl FileWriter { let keep_original_array = self.options.keep_original_array.unwrap_or(false); let encoding_strategy = self.options.encoding_strategy.clone().unwrap_or_else(|| { - let version = self.options.format_version.unwrap_or_default(); + let version = self.version(); Arc::new(CoreFieldEncodingStrategy { array_encoding_strategy: Arc::new(CoreArrayEncodingStrategy { version }), version, @@ -619,14 +615,16 @@ fn concat_lance_footer(batch: &EncodedBatch, write_schema: bool) -> Result, + file_major_version: u32, + file_minor_version: u32, + ) -> Self { + Self { + path: path.into(), + fields: vec![], + column_indices: vec![], + file_major_version, + file_minor_version, + } + } + pub fn new_legacy_from_fields(path: impl Into, fields: Vec) -> Self { Self::new( path, @@ -294,14 +309,11 @@ impl Fragment { path: impl Into, field_ids: Vec, column_indices: Vec, + version: &LanceFileVersion, ) { - self.files.push(DataFile::new( - path, - field_ids, - column_indices, - MAJOR_VERSION as u32, - MINOR_VERSION_NEXT as u32, - )); + let (major, minor) = version.to_numbers(); + self.files + .push(DataFile::new(path, field_ids, column_indices, major, minor)); } /// Add a new [`DataFile`] to this fragment. diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index f497481dc3..7b35a0b92c 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -256,6 +256,7 @@ impl GenericFileReader for V1Reader { } mod v2_adapter { + use lance_core::datatypes::Field; use lance_encoding::decoder::FilterExpression; use super::*; @@ -280,23 +281,29 @@ mod v2_adapter { } } - pub fn projection_from_lance(&self, schema: &Schema) -> ReaderProjection { - let column_indices = schema - .fields - .iter() - .map(|f| { - *self.field_id_to_column_idx.get(&f.id).unwrap_or_else(|| { - panic!( - "attempt to project field with id {} which did not exist in the data file", - f.id - ) - }) - }) - .collect::>(); - ReaderProjection { + fn projection_from_lance_helper<'a>( + &self, + fields: impl Iterator, + column_indices: &mut Vec, + ) -> Result<()> { + for field in fields { + let column_idx = *self.field_id_to_column_idx.get(&field.id).ok_or_else(|| Error::InvalidInput { + location: location!(), + source: format!("the schema referenced a field with id {} which was not in the data file's metadata", field.id).into(), + })?; + column_indices.push(column_idx); + self.projection_from_lance_helper(field.children.iter(), column_indices)?; + } + Ok(()) + } + + pub fn projection_from_lance(&self, schema: &Schema) -> Result { + let mut column_indices = Vec::new(); + self.projection_from_lance_helper(schema.fields.iter(), &mut column_indices)?; + Ok(ReaderProjection { schema: Arc::new(schema.clone()), column_indices, - } + }) } } @@ -309,7 +316,7 @@ mod v2_adapter { batch_size: u32, projection: Arc, ) -> Result { - let projection = self.projection_from_lance(projection.as_ref()); + let projection = self.projection_from_lance(projection.as_ref())?; Ok(self .reader .read_tasks( @@ -330,7 +337,7 @@ mod v2_adapter { batch_size: u32, projection: Arc, ) -> Result { - let projection = self.projection_from_lance(projection.as_ref()); + let projection = self.projection_from_lance(projection.as_ref())?; Ok(self .reader .read_tasks( @@ -353,7 +360,7 @@ mod v2_adapter { projection: Arc, ) -> Result { let indices = UInt32Array::from(indices.to_vec()); - let projection = self.projection_from_lance(projection.as_ref()); + let projection = self.projection_from_lance(projection.as_ref())?; Ok(self .reader .read_tasks( @@ -482,7 +489,12 @@ impl FileFragment { .map(|c| c as i32) .collect(); - frag.add_file(filename, dataset.schema().field_ids(), column_indices); + frag.add_file( + filename, + dataset.schema().field_ids(), + column_indices, + &file_version, + ); Ok(frag) } } diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 27af8f18dd..652e4b659a 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -11,7 +11,6 @@ use lance_core::datatypes::Schema; use lance_core::Error; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::utils::{peek_reader_schema, reader_to_stream}; -use lance_file::format::{MAJOR_VERSION, MINOR_VERSION_NEXT}; use lance_file::v2::writer::FileWriterOptions; use lance_file::version::LanceFileVersion; use lance_file::writer::FileWriter; @@ -97,6 +96,11 @@ impl<'a> FragmentCreateBuilder<'a> { FileWriterOptions::default(), )?; + let (major, minor) = writer.version().to_numbers(); + + let data_file = DataFile::new_unstarted(filename, major, minor); + fragment.files.push(data_file); + progress.begin(&fragment).await?; let break_limit = (128 * 1024).min(params.max_rows_per_file); @@ -125,15 +129,9 @@ impl<'a> FragmentCreateBuilder<'a> { .iter() .map(|(_, column_index)| *column_index) .collect::>(); - let data_file = DataFile::new( - filename, - field_ids, - column_indices, - MAJOR_VERSION as u32, - MINOR_VERSION_NEXT as u32, - ); - fragment.files.push(data_file); + fragment.files[0].fields = field_ids; + fragment.files[0].column_indices = column_indices; progress.complete(&fragment).await?; diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index bda2ac708c..f47bc9a3bf 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -995,10 +995,10 @@ impl Scanner { } else { self.projection_plan.physical_schema.clone() }; - if !self.dataset.is_legacy_storage() { - // If this is a v2 dataset then we can pushdown limit/offset (via - // scan_range and we zero out limit/offset so we don't apply it - // twice) + if scan_range.is_some() && !self.dataset.is_legacy_storage() { + // If this is a v2 dataset with no filter then we can pushdown + // limit/offset (via scan_range and we zero out limit/offset + // so we don't apply it twice) use_limit_node = false; } self.scan( diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 573b97e9cc..9f38611171 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -890,18 +890,13 @@ impl MergeInsertJob { Self::commit(self.dataset, Vec::new(), updated_fragments, Vec::new()).await? } else { - let version = self - .dataset - .manifest() - .data_storage_format - .lance_file_version()?; let new_fragments = write_fragments_internal( Some(&self.dataset), self.dataset.object_store.clone(), &self.dataset.base, self.dataset.schema(), Box::pin(stream), - WriteParams::with_storage_version(version), + WriteParams::default(), ) .await?; // Apply deletions