From ac36f298b8a4766e9d47e69c0484b7bacc9431d1 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Wed, 26 Feb 2025 08:49:26 +0000 Subject: [PATCH] feat: add positional delete parsing. Add tests for end-to-end positional delete functionality --- .../src/arrow/caching_delete_file_loader.rs | 66 +++++++-- .../iceberg/src/arrow/delete_file_loader.rs | 133 +----------------- crates/iceberg/src/arrow/delete_filter.rs | 58 +++++--- crates/iceberg/src/delete_vector.rs | 9 ++ .../shared_tests/read_positional_deletes.rs | 17 +-- 5 files changed, 110 insertions(+), 173 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index a48ebe5dfd..f0dece75a4 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; +use arrow_array::{Int64Array, StringArray}; use futures::{StreamExt, TryStreamExt}; use tokio::sync::oneshot::{Receiver, channel}; @@ -267,14 +268,44 @@ impl CachingDeleteFileLoader { /// /// Returns a map of data file path to a delete vector async fn parse_positional_deletes_record_batch_stream( - stream: ArrowRecordBatchStream, + mut stream: ArrowRecordBatchStream, ) -> Result> { - // TODO + let mut result: HashMap = HashMap::default(); + + while let Some(batch) = stream.next().await { + let batch = batch?; + let schema = batch.schema(); + let columns = batch.columns(); + + let Some(file_paths) = columns[0].as_any().downcast_ref::() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Could not downcast file paths array to StringArray", + )); + }; + let Some(positions) = columns[1].as_any().downcast_ref::() else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Could not downcast positions array to Int64Array", + )); + }; + + for (file_path, pos) in file_paths.iter().zip(positions.iter()) { + let (Some(file_path), Some(pos)) = (file_path, pos) else { + return Err(Error::new( + ErrorKind::DataInvalid, + "null values in delete file", + )); + }; - Err(Error::new( - ErrorKind::FeatureUnsupported, - "parsing of positional deletes is not yet supported", - )) + result + .entry(file_path.to_string()) + .or_default() + .insert(pos as u64); + } + } + + Ok(result) } /// Parses record batch streams from individual equality delete files @@ -297,10 +328,10 @@ mod tests { use tempfile::TempDir; use super::*; - use crate::arrow::delete_file_loader::tests::setup; + use crate::arrow::delete_filter::tests::setup; #[tokio::test] - async fn test_delete_file_manager_load_deletes() { + async fn test_caching_delete_file_loader_load_deletes() { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path(); let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) @@ -308,17 +339,26 @@ mod tests { .build() .unwrap(); - // Note that with the delete file parsing not yet in place, all we can test here is that - // the call to the loader fails with the expected FeatureUnsupportedError. - let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); let file_scan_tasks = setup(table_location); - let result = delete_file_manager + let delete_filter = delete_file_loader .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await + .unwrap() .unwrap(); - assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + let result = delete_filter + .get_delete_vector(&file_scan_tasks[0]) + .unwrap(); + + // union of pos dels from pos del file 1 and 2, ie + // [0, 1, 3, 5, 6, 8, 1022, 1023] | [0, 1, 3, 5, 20, 21, 22, 23] + // = [0, 1, 3, 5, 6, 8, 20, 21, 22, 23, 1022, 1023] + assert_eq!(result.lock().unwrap().len(), 12); + + let result = delete_filter.get_delete_vector(&file_scan_tasks[1]); + assert!(result.is_none()); // no pos dels for file 3 } } diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 802ea794c4..592ef2eb4a 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -110,27 +110,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader { } #[cfg(test)] -pub(crate) mod tests { - use std::collections::HashMap; - use std::fs::File; - use std::path::Path; - use std::sync::Arc; - - use arrow_array::{Int64Array, RecordBatch, StringArray}; - use arrow_schema::Schema as ArrowSchema; - use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; - use parquet::basic::Compression; - use parquet::file::properties::WriterProperties; +mod tests { use tempfile::TempDir; use super::*; - use crate::scan::FileScanTask; - use crate::spec::{DataContentType, DataFileFormat, Schema}; - - type ArrowSchemaRef = Arc; - - const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; - const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + use crate::arrow::delete_filter::tests::setup; #[tokio::test] async fn test_basic_delete_file_loader_read_delete_file() { @@ -141,8 +125,6 @@ pub(crate) mod tests { .build() .unwrap(); - // Note that with the delete file parsing not yet in place, all we can test here is that - // the call to the loader fails with the expected FeatureUnsupportedError. let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone()); let file_scan_tasks = setup(table_location); @@ -159,115 +141,4 @@ pub(crate) mod tests { assert_eq!(result.len(), 1); } - - pub(crate) fn setup(table_location: &Path) -> Vec { - let data_file_schema = Arc::new(Schema::builder().build().unwrap()); - let positional_delete_schema = create_pos_del_schema(); - - let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; - let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; - - let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); - let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); - - let props = WriterProperties::builder() - .set_compression(Compression::SNAPPY) - .build(); - - for n in 1..=3 { - let positional_deletes_to_write = - RecordBatch::try_new(positional_delete_schema.clone(), vec![ - file_path_col.clone(), - pos_col.clone(), - ]) - .unwrap(); - - let file = File::create(format!( - "{}/pos-del-{}.parquet", - table_location.to_str().unwrap(), - n - )) - .unwrap(); - let mut writer = ArrowWriter::try_new( - file, - positional_deletes_to_write.schema(), - Some(props.clone()), - ) - .unwrap(); - - writer - .write(&positional_deletes_to_write) - .expect("Writing batch"); - - // writer must be closed to write footer - writer.close().unwrap(); - } - - let pos_del_1 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, - equality_ids: vec![], - }; - - let pos_del_2 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, - equality_ids: vec![], - }; - - let pos_del_3 = FileScanTaskDeleteFile { - file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), - file_type: DataContentType::PositionDeletes, - partition_spec_id: 0, - equality_ids: vec![], - }; - - let file_scan_tasks = vec![ - FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: "".to_string(), - data_file_content: DataContentType::Data, - data_file_format: DataFileFormat::Parquet, - schema: data_file_schema.clone(), - project_field_ids: vec![], - predicate: None, - deletes: vec![pos_del_1, pos_del_2.clone()], - }, - FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: "".to_string(), - data_file_content: DataContentType::Data, - data_file_format: DataFileFormat::Parquet, - schema: data_file_schema.clone(), - project_field_ids: vec![], - predicate: None, - deletes: vec![pos_del_2, pos_del_3], - }, - ]; - - file_scan_tasks - } - - pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef { - let fields = vec![ - arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), - )])), - arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( - HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - FIELD_ID_POSITIONAL_DELETE_POS.to_string(), - )]), - ), - ]; - Arc::new(arrow_schema::Schema::new(fields)) - } } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index e2acab1923..579d2ea054 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -195,7 +195,7 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use std::fs::File; use std::path::Path; use std::sync::Arc; @@ -218,7 +218,7 @@ mod tests { const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; #[tokio::test] - async fn test_delete_file_manager_load_deletes() { + async fn test_delete_file_filter_load_deletes() { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path(); let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) @@ -226,35 +226,59 @@ mod tests { .build() .unwrap(); - // Note that with the delete file parsing not yet in place, all we can test here is that - // the call to the loader fails with the expected FeatureUnsupportedError. - let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10); + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); let file_scan_tasks = setup(table_location); - let result = delete_file_manager + let delete_filter = delete_file_loader .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) .await + .unwrap() + .unwrap(); + + let result = delete_filter + .get_delete_vector(&file_scan_tasks[0]) + .unwrap(); + assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2 + + let delete_filter = delete_file_loader + .load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref()) + .await + .unwrap() .unwrap(); - assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported)); + let result = delete_filter + .get_delete_vector(&file_scan_tasks[1]) + .unwrap(); + assert_eq!(result.lock().unwrap().len(), 8); // no pos dels for file 3 } - fn setup(table_location: &Path) -> Vec { + pub(crate) fn setup(table_location: &Path) -> Vec { let data_file_schema = Arc::new(Schema::builder().build().unwrap()); let positional_delete_schema = create_pos_del_schema(); - let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; - let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; - - let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); - let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + let file_path_values = [ + vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8], + vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8], + vec![format!("{}/2.parquet", table_location.to_str().unwrap()); 8], + ]; + let pos_values = [ + vec![0i64, 1, 3, 5, 6, 8, 1022, 1023], + vec![0i64, 1, 3, 5, 20, 21, 22, 23], + vec![0i64, 1, 3, 5, 6, 8, 1022, 1023], + ]; let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); for n in 1..=3 { + let file_path_vals = file_path_values.get(n - 1).unwrap(); + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_vals)); + + let pos_vals = pos_values.get(n - 1).unwrap(); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_vals.clone())); + let positional_deletes_to_write = RecordBatch::try_new(positional_delete_schema.clone(), vec![ file_path_col.clone(), @@ -309,7 +333,7 @@ mod tests { start: 0, length: 0, record_count: None, - data_file_path: "".to_string(), + data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()), data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, schema: data_file_schema.clone(), @@ -321,20 +345,20 @@ mod tests { start: 0, length: 0, record_count: None, - data_file_path: "".to_string(), + data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()), data_file_content: DataContentType::Data, data_file_format: DataFileFormat::Parquet, schema: data_file_schema.clone(), project_field_ids: vec![], predicate: None, - deletes: vec![pos_del_2, pos_del_3], + deletes: vec![pos_del_3], }, ]; file_scan_tasks } - fn create_pos_del_schema() -> ArrowSchemaRef { + pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef { let fields = vec![ arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) .with_metadata(HashMap::from([( diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index e4ab74f108..7b4d40baea 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -38,6 +38,15 @@ impl DeleteVector { let outer = self.inner.bitmaps(); DeleteVectorIterator { outer, inner: None } } + + pub fn insert(&mut self, pos: u64) -> bool { + self.inner.insert(pos) + } + + #[allow(unused)] + pub fn len(&self) -> u64 { + self.inner.len() + } } // Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here. diff --git a/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs b/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs index 34085eeee4..f631586edc 100644 --- a/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/shared_tests/read_positional_deletes.rs @@ -18,7 +18,6 @@ //! Integration tests for rest catalog. use futures::TryStreamExt; -use iceberg::ErrorKind::FeatureUnsupported; use iceberg::{Catalog, TableIdent}; use iceberg_catalog_rest::RestCatalog; @@ -53,15 +52,9 @@ async fn test_read_table_with_positional_deletes() { // when with_delete_file_processing_enabled == true assert_eq!(plan[0].deletes.len(), 2); - // 😱 If we don't support positional deletes, we should fail when we try to read a table that - // has positional deletes. The table has 12 rows, and 2 are deleted, see provision.py - let result = scan.to_arrow().await.unwrap().try_collect::>().await; - - assert!(result.is_err_and(|e| e.kind() == FeatureUnsupported)); - - // When we get support for it: - // let batch_stream = scan.to_arrow().await.unwrap(); - // let batches: Vec<_> = batch_stream.try_collect().await.is_err(); - // let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); - // assert_eq!(num_rows, 10); + // we should see two rows deleted, returning 10 rows instead of 12 + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + let num_rows: usize = batches.iter().map(|v| v.num_rows()).sum(); + assert_eq!(num_rows, 10); }