Skip to content

Commit

Permalink
Cleanup and centralize v2 projection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Sep 9, 2024
1 parent ab9bc2e commit 51fb149
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 133 deletions.
24 changes: 17 additions & 7 deletions rust/lance-encoding-datafusion/src/zone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,13 +609,19 @@ mod tests {
..Default::default()
};

let (schema, data) = write_lance_file(data, &fs, options).await;
let written_file = write_lance_file(data, &fs, options).await;

let decoder_middleware = DecoderMiddlewareChain::new()
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone())))
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(
written_file.schema.clone(),
)))
.add_strategy(Arc::new(CoreFieldDecoderStrategy::default()));

let num_rows = data.iter().map(|rb| rb.num_rows()).sum::<usize>();
let num_rows = written_file
.data
.iter()
.map(|rb| rb.num_rows())
.sum::<usize>();

let result = count_lance_file(
&fs,
Expand All @@ -626,7 +632,9 @@ mod tests {
assert_eq!(num_rows, result);

let decoder_middleware = DecoderMiddlewareChain::new()
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone())))
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(
written_file.schema.clone(),
)))
.add_strategy(Arc::new(CoreFieldDecoderStrategy::default()));

let result = count_lance_file(
Expand All @@ -638,15 +646,17 @@ mod tests {
op: Operator::Gt,
right: Box::new(Expr::Literal(ScalarValue::Int32(Some(50000)))),
}),
schema.as_ref(),
written_file.schema.as_ref(),
)
.unwrap(),
)
.await;
assert_eq!(0, result);

let decoder_middleware = DecoderMiddlewareChain::new()
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(schema.clone())))
.add_strategy(Arc::new(LanceDfFieldDecoderStrategy::new(
written_file.schema.clone(),
)))
.add_strategy(Arc::new(CoreFieldDecoderStrategy::default()));

let result = count_lance_file(
Expand All @@ -658,7 +668,7 @@ mod tests {
op: Operator::Gt,
right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
}),
schema.as_ref(),
written_file.schema.as_ref(),
)
.unwrap(),
)
Expand Down
20 changes: 10 additions & 10 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,14 @@ impl BufferEncodingStrategy for CoreBufferEncodingStrategy {
#[derive(Default)]
pub struct ColumnIndexSequence {
current_index: u32,
mapping: Vec<(i32, i32)>,
mapping: Vec<(u32, u32)>,
}

impl ColumnIndexSequence {
pub fn next_column_index(&mut self, field_id: i32) -> u32 {
pub fn next_column_index(&mut self, field_id: u32) -> u32 {
let idx = self.current_index;
self.current_index += 1;
self.mapping.push((field_id, idx as i32));
self.mapping.push((field_id, idx));
idx
}

Expand Down Expand Up @@ -756,13 +756,13 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id),
column_index.next_column_index(field.id as u32),
field.clone(),
)?))
} else {
match data_type {
DataType::List(_child) | DataType::LargeList(_child) => {
let list_idx = column_index.next_column_index(field.id);
let list_idx = column_index.next_column_index(field.id as u32);
let inner_encoding = encoding_strategy_root.create_field_encoder(
encoding_strategy_root,
&field.children[0],
Expand Down Expand Up @@ -794,11 +794,11 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id),
column_index.next_column_index(field.id as u32),
field.clone(),
)?))
} else {
let header_idx = column_index.next_column_index(field.id);
let header_idx = column_index.next_column_index(field.id as u32);
let children_encoders = field
.children
.iter()
Expand All @@ -823,7 +823,7 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
Ok(Box::new(PrimitiveFieldEncoder::try_new(
options,
self.array_encoding_strategy.clone(),
column_index.next_column_index(field.id),
column_index.next_column_index(field.id as u32),
field.clone(),
)?))
} else {
Expand All @@ -845,7 +845,7 @@ impl FieldEncodingStrategy for CoreFieldEncodingStrategy {
/// to field encoders for each top-level field in the batch.
pub struct BatchEncoder {
pub field_encoders: Vec<Box<dyn FieldEncoder>>,
pub field_id_to_column_index: Vec<(i32, i32)>,
pub field_id_to_column_index: Vec<(u32, u32)>,
}

impl BatchEncoder {
Expand Down Expand Up @@ -978,7 +978,7 @@ pub async fn encode_batch(
let top_level_columns = batch_encoder
.field_id_to_column_index
.iter()
.map(|(_, idx)| *idx as u32)
.map(|(_, idx)| *idx)
.collect();
Ok(EncodedBatch {
data: data_buffer.freeze(),
Expand Down
Loading

0 comments on commit 51fb149

Please sign in to comment.