Skip to content

Commit

Permalink
feat: Add more fields in FileScanTask
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Sep 7, 2024
1 parent 620d58e commit ecba8eb
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 43 deletions.
12 changes: 6 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ArrowReader {
|(file_scan_task, file_io, tx)| async move {
match file_scan_task {
Ok(task) => {
let file_path = task.data_file_path().to_string();
let file_path = task.data_file_path.to_string();

spawn(async move {
Self::process_file_scan_task(
Expand Down Expand Up @@ -171,7 +171,7 @@ impl ArrowReader {
) -> Result<()> {
// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(task.data_file_path())?;
let parquet_file = file_io.new_input(&task.data_file_path)?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
Expand All @@ -187,8 +187,8 @@ impl ArrowReader {
// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
Expand All @@ -198,7 +198,7 @@ impl ArrowReader {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

if let Some(predicate) = task.predicate() {
if let Some(predicate) = &task.predicate {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
predicate,
Expand All @@ -218,7 +218,7 @@ impl ArrowReader {
predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
task.schema(),
&task.schema,
)?;

selected_row_groups = Some(result);
Expand Down
76 changes: 41 additions & 35 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::runtime::spawn;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, Schema,
SchemaRef, SnapshotRef, TableMetadataRef,
DataContentType, DataFileFormat, ManifestContentType, ManifestEntryRef, ManifestFile,
ManifestList, Schema, SchemaRef, SnapshotRef, TableMetadataRef,
};
use crate::table::Table;
use crate::utils::available_parallelism;
Expand Down Expand Up @@ -529,14 +529,19 @@ impl ManifestEntryContext {
/// created from it
fn into_file_scan_task(self) -> FileScanTask {
FileScanTask {
data_file_path: self.manifest_entry.file_path().to_string(),
start: 0,
length: self.manifest_entry.file_size_in_bytes(),
record_count: Some(self.manifest_entry.record_count()),

data_file_path: self.manifest_entry.file_path().to_string(),
data_file_content: self.manifest_entry.content_type(),
data_file_format: self.manifest_entry.file_format(),

schema: self.snapshot_schema,
project_field_ids: self.field_ids.to_vec(),
predicate: self
.bound_predicates
.map(|x| x.as_ref().snapshot_bound_predicate.clone()),
schema: self.snapshot_schema,
}
}
}
Expand Down Expand Up @@ -854,35 +859,30 @@ impl ExpressionEvaluatorCache {
/// A task to scan part of file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
data_file_path: String,
start: u64,
length: u64,
project_field_ids: Vec<i32>,
/// The start offset of the file to scan.
pub start: u64,
/// The length of the file to scan.
pub length: u64,
/// The number of records in the file to scan.
///
/// This is an optional field, and only available if we are
/// reading the entire data file.
pub record_count: Option<u64>,

/// The data file path corresponding to the task.
pub data_file_path: String,
/// The content type of the file to scan.
pub data_file_content: DataContentType,
/// The format of the file to scan.
pub data_file_format: DataFileFormat,

/// The schema of the file to scan.
pub schema: SchemaRef,
/// The field ids to project.
pub project_field_ids: Vec<i32>,
/// The predicate to filter.
#[serde(skip_serializing_if = "Option::is_none")]
predicate: Option<BoundPredicate>,
schema: SchemaRef,
}

impl FileScanTask {
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
&self.data_file_path
}

/// Returns the project field id of this file scan task.
pub fn project_field_ids(&self) -> &[i32] {
&self.project_field_ids
}

/// Returns the predicate of this file scan task.
pub fn predicate(&self) -> Option<&BoundPredicate> {
self.predicate.as_ref()
}

/// Returns the schema id of this file scan task.
pub fn schema(&self) -> &Schema {
&self.schema
}
pub predicate: Option<BoundPredicate>,
}

#[cfg(test)]
Expand Down Expand Up @@ -1219,17 +1219,17 @@ mod tests {

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data_file_path().to_string());
tasks.sort_by_key(|t| t.data_file_path.to_string());

// Check first task is added data file
assert_eq!(
tasks[0].data_file_path(),
tasks[0].data_file_path,
format!("{}/1.parquet", &fixture.table_location)
);

// Check second task is existing data file
assert_eq!(
tasks[1].data_file_path(),
tasks[1].data_file_path,
format!("{}/3.parquet", &fixture.table_location)
);
}
Expand Down Expand Up @@ -1582,22 +1582,28 @@ mod tests {
);
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: None,
schema: schema.clone(),
record_count: Some(100),
data_file_format: DataFileFormat::Parquet,
};
test_fn(task);

// with predicate
let task = FileScanTask {
data_file_path: "data_file_path".to_string(),
data_file_content: DataContentType::Data,
start: 0,
length: 100,
project_field_ids: vec![1, 2, 3],
predicate: Some(BoundPredicate::AlwaysTrue),
schema,
record_count: None,
data_file_format: DataFileFormat::Avro,
};
test_fn(task);
}
Expand Down
12 changes: 10 additions & 2 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use std::sync::Arc;

use apache_avro::{from_value, to_value, Reader as AvroReader, Writer as AvroWriter};
use bytes::Bytes;
use serde_derive::{Deserialize, Serialize};
use serde_json::to_vec;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use typed_builder::TypedBuilder;

use self::_const_schema::{manifest_schema_v1, manifest_schema_v2};
Expand Down Expand Up @@ -866,6 +868,12 @@ impl ManifestEntry {
&self.data_file.file_path
}

/// Data file record count of the manifest entry.
#[inline]
pub fn record_count(&self) -> u64 {
self.data_file.record_count
}

/// Inherit data from manifest list, such as snapshot id, sequence number.
pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestFile) {
if self.snapshot_id.is_none() {
Expand Down Expand Up @@ -1141,7 +1149,7 @@ impl DataFile {
}
/// Type of content stored by the data file: data, equality deletes, or
/// position deletes (all v1 files are data files)
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum DataContentType {
/// value: 0
Data = 0,
Expand All @@ -1168,7 +1176,7 @@ impl TryFrom<i32> for DataContentType {
}

/// Format of this data.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, SerializeDisplay, DeserializeFromStr)]
pub enum DataFileFormat {
/// Avro file format: <https://avro.apache.org/>
Avro,
Expand Down

0 comments on commit ecba8eb

Please sign in to comment.