From f4a79d19fcda17511e0ed61d6bad8465a456a907 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Sat, 7 Sep 2024 00:22:42 +0800 Subject: [PATCH] refactor: Store DataFile in FileScanTask instead Signed-off-by: Xuanwo --- crates/iceberg/src/arrow/reader.rs | 4 +- crates/iceberg/src/scan.rs | 81 +++++------------------------- 2 files changed, 15 insertions(+), 70 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index b058c8d25..a1821d473 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -133,7 +133,7 @@ impl ArrowReader { |(file_scan_task, file_io, tx)| async move { match file_scan_task { Ok(task) => { - let file_path = task.data_file_path().to_string(); + let file_path = task.data_file().file_path.to_string(); spawn(async move { Self::process_file_scan_task( @@ -171,7 +171,7 @@ impl ArrowReader { ) -> Result<()> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within - let parquet_file = file_io.new_input(task.data_file_path())?; + let parquet_file = file_io.new_input(&task.data_file().file_path)?; let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?; let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 45d7d4fd1..233173940 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -24,7 +24,6 @@ use arrow_array::RecordBatch; use futures::channel::mpsc::{channel, Sender}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryFutureExt, TryStreamExt}; -use serde::{Deserialize, Serialize}; use crate::arrow::ArrowReaderBuilder; use crate::expr::visitors::expression_evaluator::ExpressionEvaluator; @@ -36,8 +35,8 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{ - DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema, - SchemaRef, SnapshotRef, TableMetadataRef, + DataContentType, DataFile, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, + Schema, SchemaRef, SnapshotRef, TableMetadataRef, }; use crate::table::Table; use crate::utils::available_parallelism; @@ -529,9 +528,7 @@ impl ManifestEntryContext { /// created from it fn into_file_scan_task(self) -> FileScanTask { FileScanTask { - data_file_path: self.manifest_entry.file_path().to_string(), - start: 0, - length: self.manifest_entry.file_size_in_bytes(), + data_file: self.manifest_entry.data_file().clone(), project_field_ids: self.field_ids.to_vec(), predicate: self .bound_predicates @@ -852,21 +849,18 @@ impl ExpressionEvaluatorCache { } /// A task to scan part of file. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone)] pub struct FileScanTask { - data_file_path: String, - start: u64, - length: u64, + data_file: DataFile, project_field_ids: Vec, - #[serde(skip_serializing_if = "Option::is_none")] predicate: Option, schema: SchemaRef, } impl FileScanTask { - /// Returns the data file path of this file scan task. - pub fn data_file_path(&self) -> &str { - &self.data_file_path + /// Returns the data file of this file scan task. + pub fn data_file(&self) -> &DataFile { + &self.data_file } /// Returns the project field id of this file scan task. @@ -902,14 +896,12 @@ mod tests { use uuid::Uuid; use crate::arrow::ArrowReaderBuilder; - use crate::expr::{BoundPredicate, Reference}; + use crate::expr::Reference; use crate::io::{FileIO, OutputFile}; - use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type, - EMPTY_SNAPSHOT_ID, + ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, }; use crate::table::Table; use crate::TableIdent; @@ -1219,17 +1211,17 @@ mod tests { assert_eq!(tasks.len(), 2); - tasks.sort_by_key(|t| t.data_file_path().to_string()); + tasks.sort_by_key(|t| t.data_file().file_path.to_string()); // Check first task is added data file assert_eq!( - tasks[0].data_file_path(), + tasks[0].data_file().file_path, format!("{}/1.parquet", &fixture.table_location) ); // Check second task is existing data file assert_eq!( - tasks[1].data_file_path(), + tasks[1].data_file().file_path, format!("{}/3.parquet", &fixture.table_location) ); } @@ -1554,51 +1546,4 @@ mod tests { let string_arr = col.as_any().downcast_ref::().unwrap(); assert_eq!(string_arr.value(0), "Apache"); } - - #[test] - fn test_file_scan_task_serialize_deserialize() { - let test_fn = |task: FileScanTask| { - let serialized = serde_json::to_string(&task).unwrap(); - let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap(); - - assert_eq!(task.data_file_path, deserialized.data_file_path); - assert_eq!(task.start, deserialized.start); - assert_eq!(task.length, deserialized.length); - assert_eq!(task.project_field_ids, deserialized.project_field_ids); - assert_eq!(task.predicate, deserialized.predicate); - assert_eq!(task.schema, deserialized.schema); - }; - - // without predicate - let schema = Arc::new( - Schema::builder() - .with_fields(vec![Arc::new(NestedField::required( - 1, - "x", - Type::Primitive(PrimitiveType::Binary), - ))]) - .build() - .unwrap(), - ); - let task = FileScanTask { - data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, - project_field_ids: vec![1, 2, 3], - predicate: None, - schema: schema.clone(), - }; - test_fn(task); - - // with predicate - let task = FileScanTask { - data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, - project_field_ids: vec![1, 2, 3], - predicate: Some(BoundPredicate::AlwaysTrue), - schema, - }; - test_fn(task); - } }