Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 53 additions & 13 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::collections::HashMap;

use arrow_array::{Int64Array, StringArray};
use futures::{StreamExt, TryStreamExt};
use tokio::sync::oneshot::{Receiver, channel};

Expand Down Expand Up @@ -267,14 +268,44 @@ impl CachingDeleteFileLoader {
///
/// Returns a map of data file path to a delete vector
async fn parse_positional_deletes_record_batch_stream(
stream: ArrowRecordBatchStream,
mut stream: ArrowRecordBatchStream,
) -> Result<HashMap<String, DeleteVector>> {
// TODO
let mut result: HashMap<String, DeleteVector> = HashMap::default();

while let Some(batch) = stream.next().await {
let batch = batch?;
let schema = batch.schema();
let columns = batch.columns();

let Some(file_paths) = columns[0].as_any().downcast_ref::<StringArray>() else {
return Err(Error::new(
ErrorKind::DataInvalid,
"Could not downcast file paths array to StringArray",
));
};
let Some(positions) = columns[1].as_any().downcast_ref::<Int64Array>() else {
return Err(Error::new(
ErrorKind::DataInvalid,
"Could not downcast positions array to Int64Array",
));
};

for (file_path, pos) in file_paths.iter().zip(positions.iter()) {
let (Some(file_path), Some(pos)) = (file_path, pos) else {
return Err(Error::new(
ErrorKind::DataInvalid,
"null values in delete file",
));
};

Err(Error::new(
ErrorKind::FeatureUnsupported,
"parsing of positional deletes is not yet supported",
))
result
.entry(file_path.to_string())
.or_default()
.insert(pos as u64);
}
}

Ok(result)
}

/// Parses record batch streams from individual equality delete files
Expand All @@ -297,28 +328,37 @@ mod tests {
use tempfile::TempDir;

use super::*;
use crate::arrow::delete_file_loader::tests::setup;
use crate::arrow::delete_filter::tests::setup;

#[tokio::test]
async fn test_delete_file_manager_load_deletes() {
async fn test_caching_delete_file_loader_load_deletes() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);

let file_scan_tasks = setup(table_location);

let result = delete_file_manager
let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap()
.unwrap();

assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
let result = delete_filter
.get_delete_vector(&file_scan_tasks[0])
.unwrap();

// union of pos dels from pos del file 1 and 2, ie
// [0, 1, 3, 5, 6, 8, 1022, 1023] | [0, 1, 3, 5, 20, 21, 22, 23]
// = [0, 1, 3, 5, 6, 8, 20, 21, 22, 23, 1022, 1023]
assert_eq!(result.lock().unwrap().len(), 12);

let result = delete_filter.get_delete_vector(&file_scan_tasks[1]);
assert!(result.is_none()); // no pos dels for file 3
}
}
133 changes: 2 additions & 131 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
}

#[cfg(test)]
pub(crate) mod tests {
use std::collections::HashMap;
use std::fs::File;
use std::path::Path;
use std::sync::Arc;

use arrow_array::{Int64Array, RecordBatch, StringArray};
use arrow_schema::Schema as ArrowSchema;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
use parquet::basic::Compression;
use parquet::file::properties::WriterProperties;
mod tests {
use tempfile::TempDir;

use super::*;
use crate::scan::FileScanTask;
use crate::spec::{DataContentType, DataFileFormat, Schema};

type ArrowSchemaRef = Arc<ArrowSchema>;

const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546;
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;
use crate::arrow::delete_filter::tests::setup;

#[tokio::test]
async fn test_basic_delete_file_loader_read_delete_file() {
Expand All @@ -141,8 +125,6 @@ pub(crate) mod tests {
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());

let file_scan_tasks = setup(table_location);
Expand All @@ -159,115 +141,4 @@ pub(crate) mod tests {

assert_eq!(result.len(), 1);
}

pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
let positional_delete_schema = create_pos_del_schema();

let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];

let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for n in 1..=3 {
let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col.clone(),
pos_col.clone(),
])
.unwrap();

let file = File::create(format!(
"{}/pos-del-{}.parquet",
table_location.to_str().unwrap(),
n
))
.unwrap();
let mut writer = ArrowWriter::try_new(
file,
positional_deletes_to_write.schema(),
Some(props.clone()),
)
.unwrap();

writer
.write(&positional_deletes_to_write)
.expect("Writing batch");

// writer must be closed to write footer
writer.close().unwrap();
}

let pos_del_1 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_2 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let pos_del_3 = FileScanTaskDeleteFile {
file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: vec![],
};

let file_scan_tasks = vec![
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_1, pos_del_2.clone()],
},
FileScanTask {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_2, pos_del_3],
},
];

file_scan_tasks
}

pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
let fields = vec![
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(),
)])),
arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
FIELD_ID_POSITIONAL_DELETE_POS.to_string(),
)]),
),
];
Arc::new(arrow_schema::Schema::new(fields))
}
}
58 changes: 41 additions & 17 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool {
}

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
Expand All @@ -218,43 +218,67 @@ mod tests {
const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545;

#[tokio::test]
async fn test_delete_file_manager_load_deletes() {
async fn test_delete_file_filter_load_deletes() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

// Note that with the delete file parsing not yet in place, all we can test here is that
// the call to the loader fails with the expected FeatureUnsupportedError.
let delete_file_manager = CachingDeleteFileLoader::new(file_io.clone(), 10);
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);

let file_scan_tasks = setup(table_location);

let result = delete_file_manager
let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap()
.unwrap();

let result = delete_filter
.get_delete_vector(&file_scan_tasks[0])
.unwrap();
assert_eq!(result.lock().unwrap().len(), 12); // pos dels from pos del file 1 and 2

let delete_filter = delete_file_loader
.load_deletes(&file_scan_tasks[1].deletes, file_scan_tasks[1].schema_ref())
.await
.unwrap()
.unwrap();

assert!(result.is_err_and(|e| e.kind() == ErrorKind::FeatureUnsupported));
let result = delete_filter
.get_delete_vector(&file_scan_tasks[1])
.unwrap();
assert_eq!(result.lock().unwrap().len(), 8); // no pos dels for file 3
}

fn setup(table_location: &Path) -> Vec<FileScanTask> {
pub(crate) fn setup(table_location: &Path) -> Vec<FileScanTask> {
let data_file_schema = Arc::new(Schema::builder().build().unwrap());
let positional_delete_schema = create_pos_del_schema();

let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8];
let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023];

let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values));
let pos_col = Arc::new(Int64Array::from_iter_values(pos_values));
let file_path_values = [
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8],
vec![format!("{}/2.parquet", table_location.to_str().unwrap()); 8],
];
let pos_values = [
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
vec![0i64, 1, 3, 5, 20, 21, 22, 23],
vec![0i64, 1, 3, 5, 6, 8, 1022, 1023],
];

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.build();

for n in 1..=3 {
let file_path_vals = file_path_values.get(n - 1).unwrap();
let file_path_col = Arc::new(StringArray::from_iter_values(file_path_vals));

let pos_vals = pos_values.get(n - 1).unwrap();
let pos_col = Arc::new(Int64Array::from_iter_values(pos_vals.clone()));

let positional_deletes_to_write =
RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col.clone(),
Expand Down Expand Up @@ -309,7 +333,7 @@ mod tests {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
Expand All @@ -321,20 +345,20 @@ mod tests {
start: 0,
length: 0,
record_count: None,
data_file_path: "".to_string(),
data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()),
data_file_content: DataContentType::Data,
data_file_format: DataFileFormat::Parquet,
schema: data_file_schema.clone(),
project_field_ids: vec![],
predicate: None,
deletes: vec![pos_del_2, pos_del_3],
deletes: vec![pos_del_3],
},
];

file_scan_tasks
}

fn create_pos_del_schema() -> ArrowSchemaRef {
pub(crate) fn create_pos_del_schema() -> ArrowSchemaRef {
let fields = vec![
arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false)
.with_metadata(HashMap::from([(
Expand Down
9 changes: 9 additions & 0 deletions crates/iceberg/src/delete_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ impl DeleteVector {
let outer = self.inner.bitmaps();
DeleteVectorIterator { outer, inner: None }
}

pub fn insert(&mut self, pos: u64) -> bool {
self.inner.insert(pos)
}

#[allow(unused)]
pub fn len(&self) -> u64 {
self.inner.len()
}
}

// Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here.
Expand Down
Loading
Loading