Skip to content

Commit

Permalink
a workable version
Browse files Browse the repository at this point in the history
  • Loading branch information
broccoliSpicy committed Dec 4, 2024
1 parent 75652ee commit ff7ddfd
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 13 deletions.
52 changes: 52 additions & 0 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,46 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
})
}
}
struct StructDataBlockBuilder {
bytes_per_values: Vec<u32>,

Check warning on line 344 in rust/lance-encoding/src/data.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `bytes_per_values` is never read

Check warning on line 344 in rust/lance-encoding/src/data.rs

View workflow job for this annotation

GitHub Actions / linux-arm

field `bytes_per_values` is never read

Check warning on line 344 in rust/lance-encoding/src/data.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `bytes_per_values` is never read

Check warning on line 344 in rust/lance-encoding/src/data.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

field `bytes_per_values` is never read

Check warning on line 344 in rust/lance-encoding/src/data.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

field `bytes_per_values` is never read
children: Vec<Box<dyn DataBlockBuilderImpl>>,
}

impl StructDataBlockBuilder {
fn new(bytes_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
let mut children = vec![];
for bytes_per_value in bytes_per_values.iter() {
let child = FixedWidthDataBlockBuilder::new(*bytes_per_value as u64 * 8, estimated_size_bytes);
children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
}
Self {
bytes_per_values,
children,
}
}
}

impl DataBlockBuilderImpl for StructDataBlockBuilder {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
let block = data_block.as_struct_ref().unwrap();
for i in 0..self.children.len() {
self.children[i].append(&block.children[i], selection.clone());
}
}

fn finish(self: Box<Self>) -> DataBlock {
let mut children_data_block = Vec::new();
for child in self.children {
let child_data_block = child.finish();
children_data_block.push(child_data_block);
}
DataBlock::Struct(StructDataBlock {
children: children_data_block,
block_info: BlockInfo::new(),
})
}
}


/// A data block to represent a fixed size list
#[derive(Debug)]
Expand Down Expand Up @@ -908,6 +948,18 @@ impl DataBlock {
inner.dimension,
))
}
Self::Struct(struct_data_block) => {
let mut bytes_per_values = vec![];
for child in struct_data_block.children.iter() {
let child = child.as_fixed_width_ref().unwrap();
let bytes_per_value = child.bits_per_value / 8;
bytes_per_values.push(bytes_per_value as u32);
}
Box::new(StructDataBlockBuilder::new(
bytes_per_values,
estimated_size_bytes
))
}
_ => todo!(),
}
}
Expand Down
12 changes: 9 additions & 3 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,21 +770,27 @@ impl CoreFieldDecoderStrategy {
.map(|v| v == "true")
.unwrap_or(false)
{
let column_info = column_infos.expect_next()?;
println!("--------column_info: {:?}", column_info);
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);
return Ok(scheduler);
/*
let column_info = column_infos.expect_next()?;
let child_scheduler: Box<dyn StructuralFieldScheduler> =
Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);
for _ in 0..fields.len() {
column_infos.next_top_level();
}
let fields = fields.clone();
println!("---fields: {:?}", fields);
return Ok(Box::new(StructuralStructScheduler::new(
vec![child_scheduler],
fields,
)) as Box<dyn StructuralFieldScheduler>);
*/
/*
let column_info = column_infos.expect_next()?;
println!("--------column_info: {:?}", column_info);
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ impl StructuralPrimitiveFieldScheduler {
column_info: &ColumnInfo,
decompressors: &dyn DecompressorStrategy,
) -> Result<Self> {
println!("inside StructuralPrimitiveFieldScheduler::try_new");
let page_schedulers = column_info
.page_infos
.iter()
Expand Down
35 changes: 25 additions & 10 deletions rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,16 +614,31 @@ pub struct StructuralStructDecoder {

impl StructuralStructDecoder {
pub fn new(fields: Fields, should_validate: bool) -> Self {
let children = fields
.iter()
.map(|field| Self::field_to_decoder(field, should_validate))
.collect();
let data_type = DataType::Struct(fields.clone());
Self {
data_type,
children,
child_fields: fields,
}
println!("inside StructuralStructDecoder::new, fields: {:?}", fields);
let field_metadata = fields[0].metadata();
if field_metadata
.get("packed")
.map(|v| v == "true")
.unwrap_or(false) {
let data_type = DataType::Struct(fields.clone());
let child = StructuralPrimitiveFieldDecoder::new(&fields[0], should_validate);
Self {
data_type,
children: vec![Box::new(child)],
child_fields: fields,
}
} else {
let children = fields
.iter()
.map(|field| Self::field_to_decoder(field, should_validate))
.collect();
let data_type = DataType::Struct(fields.clone());
Self {
data_type,
children,
child_fields: fields,
}
}
}

fn field_to_decoder(
Expand Down
26 changes: 26 additions & 0 deletions rust/lance-file/src/v2/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,31 @@ impl ReaderProjection {
}
})
.collect::<Vec<_>>();
let mut column_indices = vec![];
let mut curr_column_idx = 0;
let mut packed_struct_fields = 0;

for field in schema.fields_pre_order() {
let field_metadata = &field.metadata;
if packed_struct_fields > 0 {
packed_struct_fields -= 1;
continue;
}
if field_metadata
.get("packed")
.map(|v| v == "true")
.unwrap_or(false) {
column_indices.push(curr_column_idx);
curr_column_idx += 1;
packed_struct_fields = field.children.len();
} else {
if field.children.is_empty() || !is_structural {
column_indices.push(curr_column_idx);
curr_column_idx += 1;
}
}
}
println!("inside from_whole_schema, column_indices: {:?}", column_indices);
Self {
schema,
column_indices,
Expand Down Expand Up @@ -643,6 +668,7 @@ impl FileReader {
location!(),
));
}
println!("projection.column_indices: {:?}", projection.column_indices);
let mut column_indices_seen = BTreeSet::new();
for column_index in &projection.column_indices {
if !column_indices_seen.insert(*column_index) {
Expand Down

0 comments on commit ff7ddfd

Please sign in to comment.