Skip to content

Commit

Permalink
Fix bug in nested projection, rebase, properly report progress when c…
Browse files Browse the repository at this point in the history
…reating fragment
  • Loading branch information
westonpace committed Sep 9, 2024
1 parent c13327c commit ab9bc2e
Show file tree
Hide file tree
Showing 12 changed files with 231 additions and 111 deletions.
29 changes: 27 additions & 2 deletions python/python/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,31 @@ def test_pickle(tmp_path: Path):
assert dataset.to_table() == unpickled.to_table()


def test_nested_projection(tmp_path: Path):
from lance.debug import format_fragment

table = pa.Table.from_pydict(
{
"a": range(100),
"b": range(100),
"struct": [{"x": counter, "y": counter % 2 == 0} for counter in range(100)],
}
)
base_dir = tmp_path / "test"
lance.write_dataset(table, base_dir)

dataset = lance.dataset(base_dir)

print(format_fragment(dataset.get_fragment(0).metadata, dataset))
projected = dataset.to_table(columns=["struct.x"])
assert projected == pa.Table.from_pydict({"struct.x": range(100)})

projected = dataset.to_table(columns=["struct.y"])
assert projected == pa.Table.from_pydict(
{"struct.y": [i % 2 == 0 for i in range(100)]}
)


def test_polar_scan(tmp_path: Path):
some_structs = [{"x": counter, "y": counter} for counter in range(100)]
table = pa.Table.from_pydict(
Expand Down Expand Up @@ -2233,8 +2258,8 @@ def test_late_materialization_batch_size(tmp_path: Path):


EXPECTED_DEFAULT_STORAGE_VERSION = "2.0"
EXPECTED_MAJOR_VERSION = 0
EXPECTED_MINOR_VERSION = 3
EXPECTED_MAJOR_VERSION = 2
EXPECTED_MINOR_VERSION = 0


def test_default_storage_version(tmp_path: Path):
Expand Down
29 changes: 16 additions & 13 deletions python/python/tests/test_fragment.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ def test_write_fragment_two_phases(tmp_path: Path):
def test_write_legacy_fragment(tmp_path: Path):
tab = pa.table({"a": range(1024)})
frag = LanceFragment.create(tmp_path, tab, data_storage_version="legacy")
assert "file_minor_version: 3" not in str(frag)
assert "file_major_version: 2" not in str(frag)

tab = pa.table({"a": range(1024)})
frag = LanceFragment.create(tmp_path, tab, data_storage_version="stable")
assert "file_minor_version: 3" in str(frag)
assert "file_major_version: 2" in str(frag)


def test_scan_fragment(tmp_path: Path):
Expand Down Expand Up @@ -99,15 +99,18 @@ def test_scan_fragment_with_dynamic_projection(tmp_path: Path):


def test_write_fragments(tmp_path: Path):
# This will be split across two files if we set the max_bytes_per_file to 1024
tab = pa.table(
{
"a": pa.array(range(1024)),
}
# Should result in two files since each batch is 8MB and max_bytes_per_file is small
batches = pa.RecordBatchReader.from_batches(
pa.schema([pa.field("a", pa.string())]),
[
pa.record_batch([pa.array(["0" * 1024] * 1024 * 8)], names=["a"]),
pa.record_batch([pa.array(["0" * 1024] * 1024 * 8)], names=["a"]),
],
)

progress = ProgressForTest()
fragments = write_fragments(
tab,
batches,
tmp_path,
max_rows_per_group=512,
max_bytes_per_file=1024,
Expand Down Expand Up @@ -200,11 +203,11 @@ def test_dataset_progress(tmp_path: Path):
assert metadata["id"] == 0
assert len(metadata["files"]) == 1
# Fragments aren't exactly equal, because the file was written before
# physical_rows was known.
assert (
fragment.data_files()
== FragmentMetadata.from_json(json.dumps(metadata)).data_files()
)
# physical_rows was known. However, the paths should be the same.
assert len(fragment.data_files()) == 1
deserialized = FragmentMetadata.from_json(json.dumps(metadata))
assert len(deserialized.data_files()) == 1
assert fragment.data_files()[0].path() == deserialized.data_files()[0].path()

ctx = multiprocessing.get_context("spawn")
p = ctx.Process(target=failing_write, args=(progress_uri, dataset_uri))
Expand Down
25 changes: 16 additions & 9 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ impl<'a> ColumnInfoIter<'a> {
&self.column_infos[self.column_info_pos]
}

pub fn expect_next(&mut self) -> Result<&'a ColumnInfo> {
self.next().ok_or_else(|| {
Error::invalid_input(
"there were more fields in the schema than provided column indices",
location!(),
)
})
}

pub(crate) fn next_top_level(&mut self) {
self.column_indices_pos += 1;
if self.column_indices_pos < self.column_indices.len() {
Expand Down Expand Up @@ -678,7 +687,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
) -> Result<ChosenFieldScheduler<'a>> {
let data_type = field.data_type();
if Self::is_primitive(&data_type) {
let primitive_col = column_infos.next().unwrap();
let primitive_col = column_infos.expect_next()?;
let scheduler = self.create_primitive_scheduler(
&data_type,
chain.current_path(),
Expand All @@ -692,7 +701,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
// A fixed size list column could either be a physical or a logical decoder
// depending on the child data type.
if Self::is_primitive(inner.data_type()) {
let primitive_col = column_infos.next().unwrap();
let primitive_col = column_infos.expect_next()?;
let scheduler = self.create_primitive_scheduler(
&data_type,
chain.current_path(),
Expand All @@ -706,7 +715,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
}
DataType::Dictionary(_key_type, value_type) => {
if Self::is_primitive(value_type) {
let primitive_col = column_infos.next().unwrap();
let primitive_col = column_infos.expect_next()?;
let scheduler = self.create_primitive_scheduler(
&data_type,
chain.current_path(),
Expand All @@ -726,7 +735,8 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
}
}
DataType::List(items_field) | DataType::LargeList(items_field) => {
let offsets_column = column_infos.next().unwrap();
let offsets_column = column_infos.expect_next()?;
column_infos.next_top_level();
Self::ensure_values_encoded(offsets_column, chain.current_path())?;
let offsets_column_buffers = ColumnBuffers {
file_buffers: buffers,
Expand Down Expand Up @@ -794,7 +804,7 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
Ok((chain, list_scheduler))
}
DataType::Struct(fields) => {
let column_info = column_infos.next().unwrap();
let column_info = column_infos.expect_next()?;

if Self::check_packed_struct(column_info) {
// use packed struct encoding
Expand All @@ -808,13 +818,10 @@ impl FieldDecoderStrategy for CoreFieldDecoderStrategy {
} else {
// use default struct encoding
Self::check_simple_struct(column_info, chain.current_path()).unwrap();
let is_root = field.metadata.contains_key("__lance_decoder_root");
let mut child_schedulers = Vec::with_capacity(field.children.len());
let mut chain = chain;
for (i, field) in field.children.iter().enumerate() {
if is_root {
column_infos.next_top_level();
}
column_infos.next_top_level();
let (next_chain, field_scheduler) =
chain.new_child(i as u32, field, column_infos, buffers)?;
child_schedulers.push(field_scheduler?);
Expand Down
47 changes: 41 additions & 6 deletions rust/lance-encoding/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{cmp::Ordering, collections::HashMap, ops::Range, sync::Arc};

use arrow::array::make_comparator;
use arrow_array::{Array, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SortOptions};
use arrow_schema::{DataType, Field, FieldRef, Schema, SortOptions};
use arrow_select::concat::concat;
use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, FutureExt, StreamExt};
Expand Down Expand Up @@ -69,12 +69,49 @@ impl EncodingsIo for SimulatedScheduler {
}
}

fn column_indices_from_schema_helper(
fields: &[FieldRef],
column_indices: &mut Vec<u32>,
column_counter: &mut u32,
) {
column_indices.push(*column_counter);
*column_counter += 1;
for field in fields {
match field.data_type() {
DataType::Struct(fields) => {
column_indices_from_schema_helper(fields.as_ref(), column_indices, column_counter);
}
DataType::List(inner) => {
column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter);
}
DataType::LargeList(inner) => {
column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter);
}
DataType::FixedSizeList(inner, _) => {
// FSL(primitive) does not get its own column
column_indices.pop();
*column_counter -= 1;
column_indices_from_schema_helper(&[inner.clone()], column_indices, column_counter);
}
_ => {
column_indices_from_schema_helper(&[], column_indices, column_counter);
}
}
}
}

fn column_indices_from_schema(schema: &Schema) -> Vec<u32> {
let mut column_indices = Vec::new();
let mut column_counter = 0;
column_indices_from_schema_helper(schema.fields(), &mut column_indices, &mut column_counter);
column_indices
}

#[allow(clippy::too_many_arguments)]
async fn test_decode(
num_rows: u64,
batch_size: u32,
schema: &Schema,
column_indices: &[u32],
column_infos: &[Arc<ColumnInfo>],
expected: Option<Arc<dyn Array>>,
io: &Arc<dyn EncodingsIo>,
Expand All @@ -88,9 +125,10 @@ async fn test_decode(
DecoderMiddlewareChain::new().add_strategy(Arc::new(CoreFieldDecoderStrategy {
validate_data: true,
}));
let column_indices = column_indices_from_schema(schema);
let decode_scheduler = DecodeBatchScheduler::try_new(
&lance_schema,
column_indices,
&column_indices,
column_infos,
&Vec::new(),
num_rows,
Expand Down Expand Up @@ -415,7 +453,6 @@ async fn check_round_trip_encoding_inner(
num_rows,
test_cases.batch_size,
&schema,
&[0],
&column_infos,
concat_data.clone(),
&scheduler_copy.clone(),
Expand Down Expand Up @@ -451,7 +488,6 @@ async fn check_round_trip_encoding_inner(
num_rows,
test_cases.batch_size,
&schema,
&[0],
&column_infos,
expected,
&scheduler.clone(),
Expand Down Expand Up @@ -498,7 +534,6 @@ async fn check_round_trip_encoding_inner(
num_rows,
test_cases.batch_size,
&schema,
&[0],
&column_infos,
expected,
&scheduler.clone(),
Expand Down
1 change: 0 additions & 1 deletion rust/lance-file/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ pub mod metadata;
/// These version/magic values are written at the end of Lance files (e.g. versions/1.version)
pub const MAJOR_VERSION: i16 = 0;
pub const MINOR_VERSION: i16 = 2;
pub const MINOR_VERSION_NEXT: u16 = 3;
pub const MAGIC: &[u8; 4] = b"LANC";
88 changes: 62 additions & 26 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,48 @@ impl CachedFileMetadata {
/// representations of the same semantic type. An encoding could
/// theoretically support "casting" (e.g. int to string, etc.) but
/// there is little advantage in doing so here.
///
/// Note: in order to specify a projection the user will need some way
/// to figure out the column indices. In the table format we do this
/// using field IDs and keeping track of the field id->column index mapping.
///
/// If users are not using the table format then they will need to figure
/// out some way to do this themselves.
#[derive(Debug, Clone)]
pub struct ReaderProjection {
/// The data types (schema) of the selected columns. The names
/// of the schema are arbitrary and ignored.
pub schema: Arc<Schema>,
/// The indices of the columns to load. Note, these are the
/// indices of the top level fields only
/// The indices of the columns to load.
///
/// The mapping should be as follows:
///
/// - Primitive: the index of the column in the schema
/// - List: the index of the list column in the schema
/// followed by the column indices of the children
/// - FixedSizeList (of primitive): the index of the column in the schema
/// (this case is not nested)
/// - FixedSizeList (of non-primitive): not yet implemented
/// - Dictionary: same as primitive
/// - Struct: the index of the struct column in the schema
/// followed by the column indices of the children
///
/// In other words, this should be a DFS listing of the desired schema.
///
/// For example, if the goal is to load:
///
/// x: int32
/// y: struct<z: int32, w: string>
/// z: list<int32>
///
/// and the schema originally used to store the data was:
///
/// a: struct<x: int32>
/// b: int64
/// y: struct<z: int32, c: int64, w: string>
/// z: list<int32>
///
/// Then the column_indices should be [1, 3, 4, 6, 7, 8]
pub column_indices: Vec<u32>,
}

Expand Down Expand Up @@ -464,9 +499,6 @@ impl FileReader {
location!(),
));
}
if projection.schema.fields.len() != projection.column_indices.len() {
return Err(Error::invalid_input(format!("The projection schema has {} top level fields but only {} column indices were provided", projection.schema.fields.len(), projection.column_indices.len()), location!()));
}
let mut column_indices_seen = BTreeSet::new();
for column_index in &projection.column_indices {
if !column_indices_seen.insert(*column_index) {
Expand All @@ -485,34 +517,38 @@ impl FileReader {
Ok(())
}

// Helper function for `default_projection` to determine how many columns are occupied
// by a lance field.
fn default_column_count(field: &Field) -> u32 {
1 + field
.children
.iter()
.map(Self::default_column_count)
.sum::<u32>()
fn default_projection_helper<'a>(
fields: impl Iterator<Item = &'a Field>,
column_indices: &mut Vec<u32>,
column_index_counter: &mut u32,
) {
for field in fields {
column_indices.push(*column_index_counter);
*column_index_counter += 1;

Self::default_projection_helper(
field.children.iter(),
column_indices,
column_index_counter,
);
}
}

// This function is one of the few spots in the reader where we rely on Lance table
// format and the fact that we wrote a Lance table schema into the global buffers.
// If we want to read the entire file, and we know the schema for the file,
// then we can use the default projection, and the user doesn't need to supply
// field IDs (and the field IDs in the schema do not need to be accurate)
//
// TODO: In the future it would probably be better for the "default type" of a column
// to be something that can be provided dynamically via the encodings registry. We
// could pass the pages of the column to some logic that picks a data type based on the
// page encodings.

/// Loads a default projection for all columns in the file, using the data type that
/// was provided when the file was written.
// If the user doesn't know the schema, then they can fetch it from the file's
// global buffers. So this method can be used in that case too.
pub fn default_projection(lance_schema: &Schema) -> ReaderProjection {
let schema = Arc::new(lance_schema.clone());
let mut column_indices = Vec::with_capacity(lance_schema.fields.len());
let mut column_index = 0;
for field in &lance_schema.fields {
column_indices.push(column_index);
column_index += Self::default_column_count(field);
}
Self::default_projection_helper(
schema.fields.iter(),
&mut column_indices,
&mut column_index,
);
ReaderProjection {
schema,
column_indices,
Expand Down
Loading

0 comments on commit ab9bc2e

Please sign in to comment.