Skip to content

Commit d08e062

Browse files
committed
refactor: Remove raw pointer indexing and add unit tests for RowIndexBuilder
1 parent c1c301b commit d08e062

File tree

2 files changed

+181
-32
lines changed

2 files changed

+181
-32
lines changed

kernel/src/engine/arrow_utils.rs

Lines changed: 148 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ macro_rules! prim_array_cmp {
6363
pub(crate) use prim_array_cmp;
6464

6565
type FieldIndex = usize;
66+
type FlattenedRangeIterator<T> = std::iter::Flatten<std::vec::IntoIter<Range<T>>>;
6667

6768
/// contains information about a StructField matched to a parquet struct field
6869
///
@@ -132,16 +133,26 @@ impl RowIndexBuilder {
132133

133134
impl IntoIterator for RowIndexBuilder {
134135
type Item = i64;
135-
type IntoIter = std::iter::Flatten<std::vec::IntoIter<Range<Self::Item>>>;
136+
type IntoIter = FlattenedRangeIterator<Self::Item>;
136137

137138
fn into_iter(self) -> Self::IntoIter {
138-
let starting_offsets = match self.row_group_ordinals {
139-
Some(ordinals) => ordinals
140-
.iter()
141-
.map(|i| self.row_group_row_index_ranges[*i].clone())
142-
.collect(),
143-
None => self.row_group_row_index_ranges,
144-
};
139+
let starting_offsets =
140+
match self.row_group_ordinals {
141+
Some(ordinals) => {
142+
// We generally ignore invalid row group ordinals, but in non-optimized builds,
143+
// we verify that all ordinals are within bounds
144+
debug_assert!(
145+
ordinals.iter().all(|&i| i < self.row_group_row_index_ranges.len()),
146+
"All row group ordinals must be within bounds of row_group_row_index_ranges"
147+
);
148+
// We have to clone here to avoid modifying the original vector in each iteration
149+
ordinals
150+
.iter()
151+
.filter_map(|&i| self.row_group_row_index_ranges.get(i).cloned())
152+
.collect()
153+
}
154+
None => self.row_group_row_index_ranges,
155+
};
145156
starting_offsets.into_iter().flatten()
146157
}
147158
}
@@ -153,7 +164,7 @@ impl IntoIterator for RowIndexBuilder {
153164
pub(crate) fn fixup_parquet_read<T>(
154165
batch: RecordBatch,
155166
requested_ordering: &[ReorderIndex],
156-
row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
167+
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
157168
) -> DeltaResult<T>
158169
where
159170
StructArray: Into<T>,
@@ -750,7 +761,7 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
750761
pub(crate) fn reorder_struct_array(
751762
input_data: StructArray,
752763
requested_ordering: &[ReorderIndex],
753-
mut row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
764+
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
754765
) -> DeltaResult<StructArray> {
755766
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
756767
if !ordering_needs_transform(requested_ordering) {
@@ -1153,6 +1164,41 @@ mod tests {
11531164
])
11541165
}
11551166

1167+
/// Helper function to create mock row group metadata for testing
1168+
fn create_mock_row_group(num_rows: i64) -> RowGroupMetaData {
1169+
use crate::parquet::basic::{Encoding, Type as PhysicalType};
1170+
use crate::parquet::file::metadata::ColumnChunkMetaData;
1171+
use crate::parquet::schema::types::Type;
1172+
1173+
// Create a minimal schema descriptor
1174+
let schema = Arc::new(SchemaDescriptor::new(Arc::new(
1175+
Type::group_type_builder("schema")
1176+
.with_fields(vec![Arc::new(
1177+
Type::primitive_type_builder("test_col", PhysicalType::INT32)
1178+
.build()
1179+
.unwrap(),
1180+
)])
1181+
.build()
1182+
.unwrap(),
1183+
)));
1184+
1185+
// Create a minimal column chunk metadata
1186+
let column_chunk = ColumnChunkMetaData::builder(schema.column(0))
1187+
.set_encodings(vec![Encoding::PLAIN])
1188+
.set_total_compressed_size(100)
1189+
.set_total_uncompressed_size(100)
1190+
.set_num_values(num_rows)
1191+
.build()
1192+
.unwrap();
1193+
1194+
RowGroupMetaData::builder(schema)
1195+
.set_num_rows(num_rows)
1196+
.set_total_byte_size(100)
1197+
.set_column_metadata(vec![column_chunk])
1198+
.build()
1199+
.unwrap()
1200+
}
1201+
11561202
#[test]
11571203
fn test_json_parsing() {
11581204
let requested_schema = Arc::new(ArrowSchema::new(vec![
@@ -1783,6 +1829,98 @@ mod tests {
17831829
assert_eq!(reorder_indices, expect_reorder);
17841830
}
17851831

1832+
#[test]
1833+
fn test_row_index_builder_no_skipping() {
1834+
let row_groups = vec![
1835+
create_mock_row_group(5), // 5 rows: indexes 0-4
1836+
create_mock_row_group(3), // 3 rows: indexes 5-7
1837+
create_mock_row_group(4), // 4 rows: indexes 8-11
1838+
];
1839+
1840+
let builder = RowIndexBuilder::new(&row_groups);
1841+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1842+
1843+
// Should produce consecutive indexes from 0 to 11
1844+
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
1845+
}
1846+
1847+
#[test]
1848+
fn test_row_index_builder_with_skipping() {
1849+
let row_groups = vec![
1850+
create_mock_row_group(5), // 5 rows: indexes 0-4
1851+
create_mock_row_group(3), // 3 rows: indexes 5-7 (will be skipped)
1852+
create_mock_row_group(4), // 4 rows: indexes 8-11
1853+
create_mock_row_group(2), // 2 rows: indexes 12-13 (will be skipped)
1854+
];
1855+
1856+
let mut builder = RowIndexBuilder::new(&row_groups);
1857+
builder.select_row_groups(&[0, 2]);
1858+
1859+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1860+
1861+
// Should produce indexes from row groups 0 and 2: [0-4] and [8-11]
1862+
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 8, 9, 10, 11]);
1863+
}
1864+
1865+
#[test]
1866+
fn test_row_index_builder_single_row_group() {
1867+
let row_groups = vec![create_mock_row_group(7)];
1868+
1869+
let mut builder = RowIndexBuilder::new(&row_groups);
1870+
builder.select_row_groups(&[0]);
1871+
1872+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1873+
1874+
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6]);
1875+
}
1876+
1877+
#[test]
1878+
fn test_row_index_builder_empty_selection() {
1879+
let row_groups = vec![create_mock_row_group(3), create_mock_row_group(2)];
1880+
1881+
let mut builder = RowIndexBuilder::new(&row_groups);
1882+
builder.select_row_groups(&[]);
1883+
1884+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1885+
1886+
// Should produce no indexes
1887+
assert_eq!(row_indexes, Vec::<i64>::new());
1888+
}
1889+
1890+
#[test]
1891+
fn test_row_index_builder_out_of_order_selection() {
1892+
let row_groups = vec![
1893+
create_mock_row_group(2), // 2 rows: indexes 0-1
1894+
create_mock_row_group(3), // 3 rows: indexes 2-4
1895+
create_mock_row_group(1), // 1 row: index 5
1896+
];
1897+
1898+
let mut builder = RowIndexBuilder::new(&row_groups);
1899+
builder.select_row_groups(&[2, 0]);
1900+
1901+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1902+
1903+
// Should produce indexes in the order specified: group 2 first, then group 0
1904+
assert_eq!(row_indexes, vec![5, 0, 1]);
1905+
}
1906+
1907+
#[test]
1908+
#[cfg(debug_assertions)]
1909+
#[should_panic(
1910+
expected = "All row group ordinals must be within bounds of row_group_row_index_ranges"
1911+
)]
1912+
fn test_row_index_builder_expect_debug_panic_for_out_of_bounds_row_group_ordinals() {
1913+
let row_groups = vec![create_mock_row_group(2)];
1914+
1915+
let mut builder = RowIndexBuilder::new(&row_groups);
1916+
builder.select_row_groups(&[1]);
1917+
1918+
// This should panic because the row group ordinal is out of bounds
1919+
// NOTE: This panic only happens in non-optimized builds
1920+
let row_indexes: Vec<i64> = builder.into_iter().collect();
1921+
assert_eq!(row_indexes, vec![0, 1]);
1922+
}
1923+
17861924
#[test]
17871925
fn nested_indices() {
17881926
column_mapping_cases().into_iter().for_each(|mode| {

kernel/tests/read.rs

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use std::path::PathBuf;
33
use std::sync::Arc;
44

55
use delta_kernel::actions::deletion_vector::split_vector;
6-
use delta_kernel::arrow::array::AsArray as _;
6+
use delta_kernel::arrow::array::{AsArray as _, BooleanArray};
77
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
88
use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema};
9+
use delta_kernel::arrow::record_batch::RecordBatch;
910
use delta_kernel::engine::arrow_conversion::TryFromKernel as _;
1011
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
1112
use delta_kernel::engine::default::DefaultEngine;
@@ -14,7 +15,7 @@ use delta_kernel::expressions::{
1415
};
1516
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
1617
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
17-
use delta_kernel::scan::Scan;
18+
use delta_kernel::scan::{Scan, ScanResult};
1819
use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType};
1920
use delta_kernel::{Engine, FileMeta, Snapshot};
2021

@@ -33,6 +34,23 @@ const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c00
3334
const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet";
3435
const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet";
3536

37+
/// Helper function to extract filtered data from a scan result, respecting row masks
38+
fn extract_record_batch(
39+
scan_result: ScanResult,
40+
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
41+
let mask = scan_result.full_mask();
42+
let record_batch = into_record_batch(scan_result.raw_data?);
43+
44+
if let Some(mask) = mask {
45+
Ok(filter_record_batch(
46+
&record_batch,
47+
&BooleanArray::from(mask),
48+
)?)
49+
} else {
50+
Ok(record_batch)
51+
}
52+
}
53+
3654
#[tokio::test]
3755
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
3856
let batch = generate_simple_batch()?;
@@ -1382,24 +1400,18 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
13821400
)
13831401
.await?;
13841402

1385-
storage
1386-
.put(
1387-
&Path::from(PARQUET_FILE1),
1388-
record_batch_to_bytes(&batch1).into(),
1389-
)
1390-
.await?;
1391-
storage
1392-
.put(
1393-
&Path::from(PARQUET_FILE2),
1394-
record_batch_to_bytes(&batch2).into(),
1395-
)
1396-
.await?;
1397-
storage
1398-
.put(
1399-
&Path::from(PARQUET_FILE3),
1400-
record_batch_to_bytes(&batch3).into(),
1401-
)
1402-
.await?;
1403+
for (parquet_file, batch) in [
1404+
(PARQUET_FILE1, &batch1),
1405+
(PARQUET_FILE2, &batch2),
1406+
(PARQUET_FILE3, &batch3),
1407+
] {
1408+
storage
1409+
.put(
1410+
&Path::from(parquet_file),
1411+
record_batch_to_bytes(batch).into(),
1412+
)
1413+
.await?;
1414+
}
14031415

14041416
let location = Url::parse("memory:///")?;
14051417
let engine = Arc::new(DefaultEngine::new(
@@ -1422,8 +1434,7 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
14221434
let stream = scan.execute(engine.clone())?;
14231435

14241436
for scan_result in stream {
1425-
let data = scan_result?.raw_data?;
1426-
let batch = into_record_batch(data);
1437+
let batch = extract_record_batch(scan_result?)?;
14271438
file_count += 1;
14281439

14291440
// Verify the schema structure

0 commit comments

Comments
 (0)