Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 26a04ab
Author: Christian Thiel <christian@hansetag.com>
Date:   Tue May 28 17:39:07 2024 +0200

    Add "services-s3" featureflag for opendal

commit e5b59a7
Author: Christian <Christian.Thiel@outlook.com>
Date:   Tue May 28 06:18:05 2024 +0200

    Feature: Schema into_builder method (apache#381)

commit 1bf80e1
Author: ZENOTME <43447882+ZENOTME@users.noreply.github.com>
Date:   Mon May 27 22:34:47 2024 +0800

    make file scan task serializable (apache#377)

    Co-authored-by: ZENOTME <st810918843@gmail.com>
  • Loading branch information
c-thiel committed May 28, 2024
1 parent e36780c commit fe811e4
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ lazy_static = { workspace = true }
log = { workspace = true }
murmur3 = { workspace = true }
once_cell = { workspace = true }
opendal = { workspace = true }
opendal = { workspace = true, features = ["services-s3"] }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
reqwest = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ArrowReader {
Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {
let parquet_file = file_io
.new_input(task.data().data_file().file_path())?;
.new_input(task.data_file_path())?;
let (parquet_metadata, parquet_reader) = try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod avro;
pub mod io;
pub mod spec;

mod scan;
pub mod scan;

#[allow(dead_code)]
pub mod expr;
Expand Down
24 changes: 13 additions & 11 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ use crate::expr::visitors::manifest_evaluator::ManifestEvaluator;
use crate::expr::{Bind, BoundPredicate, Predicate};
use crate::io::FileIO;
use crate::spec::{
DataContentType, ManifestContentType, ManifestEntryRef, ManifestFile, Schema, SchemaRef,
SnapshotRef, TableMetadataRef,
DataContentType, ManifestContentType, ManifestFile, Schema, SchemaRef, SnapshotRef,
TableMetadataRef,
};
use crate::table::Table;
use crate::{Error, ErrorKind, Result};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::BoxStream;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -55,7 +56,7 @@ pub struct TableScanBuilder<'a> {
}

impl<'a> TableScanBuilder<'a> {
pub fn new(table: &'a Table) -> Self {
pub(crate) fn new(table: &'a Table) -> Self {
Self {
table,
column_names: vec![],
Expand Down Expand Up @@ -265,7 +266,7 @@ impl TableScan {
}
DataContentType::Data => {
let scan_task: Result<FileScanTask> = Ok(FileScanTask {
data_manifest_entry: manifest_entry.clone(),
data_file_path: manifest_entry.data_file().file_path().to_string(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
});
Expand Down Expand Up @@ -463,18 +464,19 @@ impl ManifestEvaluatorCache {
}

/// A task to scan part of file.
#[derive(Debug)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileScanTask {
data_manifest_entry: ManifestEntryRef,
data_file_path: String,
#[allow(dead_code)]
start: u64,
#[allow(dead_code)]
length: u64,
}

impl FileScanTask {
pub fn data(&self) -> ManifestEntryRef {
self.data_manifest_entry.clone()
/// Returns the data file path of this file scan task.
pub fn data_file_path(&self) -> &str {
&self.data_file_path
}
}

Expand Down Expand Up @@ -794,17 +796,17 @@ mod tests {

assert_eq!(tasks.len(), 2);

tasks.sort_by_key(|t| t.data().data_file().file_path().to_string());
tasks.sort_by_key(|t| t.data_file_path().to_string());

// Check first task is added data file
assert_eq!(
tasks[0].data().data_file().file_path(),
tasks[0].data_file_path(),
format!("{}/1.parquet", &fixture.table_location)
);

// Check second task is existing data file
assert_eq!(
tasks[1].data().data_file().file_path(),
tasks[1].data_file_path(),
format!("{}/3.parquet", &fixture.table_location)
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Schema {
pub fn into_builder(self) -> SchemaBuilder {
SchemaBuilder {
schema_id: self.schema_id,
fields: self.r#struct.fields().iter().cloned().collect(),
fields: self.r#struct.fields().to_vec(),
alias_to_id: self.alias_to_id,
identifier_field_ids: self.identifier_field_ids,
}
Expand Down

0 comments on commit fe811e4

Please sign in to comment.