Skip to content

Commit

Permalink
Iceberg use parquet files part.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Aug 24, 2023
1 parent d614082 commit 59d81c1
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 38 deletions.
20 changes: 10 additions & 10 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
Expand Down Expand Up @@ -41,9 +40,9 @@ use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use common_pipeline_core::Pipeline;
use common_storage::DataOperator;
use common_storages_parquet::ParquetFilesPart;
use common_storages_parquet::ParquetPart;
use common_storages_parquet::ParquetRSReader;
use common_storages_parquet::ParquetRSRowGroupPart;
use storages_common_pruner::RangePrunerCreator;
use tokio::sync::OnceCell;

Expand Down Expand Up @@ -171,6 +170,7 @@ impl IcebergTable {
&arrow_schema,
plan,
ParquetReadOptions::default(),
true,
)?);

// TODO: we need to support top_k.
Expand Down Expand Up @@ -232,14 +232,14 @@ impl IcebergTable {
let location = table
.rel_path(&v.file_path)
.expect("file path must be rel to table");
Ok(Arc::new(Box::new(IcebergPartInfo::Parquet(
ParquetPart::ParquetRSRowGroup(ParquetRSRowGroupPart {
location,
num_rows: v.record_count as usize,
column_metas: HashMap::new(),
row_selection: None,
}),
)) as Box<dyn PartInfo>))
Ok(Arc::new(
Box::new(IcebergPartInfo::Parquet(ParquetPart::ParquetFiles(
ParquetFilesPart {
files: vec![(location, v.file_size_in_bytes as u64)],
estimated_uncompressed_size: v.file_size_in_bytes as u64, // This field is not used here.
},
))) as Box<dyn PartInfo>,
))
}
_ => Err(ErrorCode::Unimplemented(
"Only parquet format is supported for iceberg table",
Expand Down
5 changes: 3 additions & 2 deletions src/query/storages/iceberg/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ impl Processor for IcebergTableSource {
// And we should try to build another stream (in next event loop).
} else if let Some(part) = self.ctx.get_partition() {
match IcebergPartInfo::from_part(&part)? {
IcebergPartInfo::Parquet(ParquetPart::ParquetRSRowGroup(file)) => {
IcebergPartInfo::Parquet(ParquetPart::ParquetFiles(files)) => {
assert_eq!(files.files.len(), 1);
let stream = self
.parquet_reader
.prepare_data_stream(self.ctx.clone(), &file.location)
.prepare_data_stream(self.ctx.clone(), &files.files[0].0)
.await?;
self.stream = Some(stream);
}
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod parquet_rs;
mod utils;

pub use parquet2::Parquet2Table;
pub use parquet_part::ParquetFilesPart;
pub use parquet_part::ParquetPart;
pub use parquet_rs::ParquetRSPruner;
pub use parquet_rs::ParquetRSReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl Parquet2Reader {
ParquetPart::Parquet2RowGroup(part) => Ok(Parquet2PartData::RowGroup(
self.row_group_readers_from_blocking_io(part, &self.operator().blocking())?,
)),
ParquetPart::SmallFiles(part) => {
ParquetPart::ParquetFiles(part) => {
let op = self.operator().blocking();
let mut buffers = Vec::with_capacity(part.files.len());
for path in &part.files {
Expand Down Expand Up @@ -330,7 +330,7 @@ impl Parquet2Reader {
let readers = readers.into_iter().collect::<IndexedReaders>();
Ok(Parquet2PartData::RowGroup(readers))
}
ParquetPart::SmallFiles(part) => {
ParquetPart::ParquetFiles(part) => {
let mut join_handlers = Vec::with_capacity(part.files.len());
for (path, _) in part.files.iter() {
let op = self.operator().clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ use crate::parquet2::parquet_reader::Parquet2Reader;
use crate::parquet2::parquet_table::Parquet2PrewhereInfo;
use crate::parquet2::pruning::PartitionPruner;
use crate::parquet2::Parquet2RowGroupPart;
use crate::parquet_part::ParquetFilesPart;
use crate::parquet_part::ParquetPart;
use crate::parquet_part::ParquetSmallFilesPart;

pub trait SmallFilePrunner: Send + Sync {
fn prune_one_file(
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Parquet2DeserializeTransform {

fn process_small_files(
&mut self,
part: &ParquetSmallFilesPart,
part: &ParquetFilesPart,
buffers: Vec<Vec<u8>>,
) -> Result<Vec<DataBlock>> {
assert_eq!(part.files.len(), buffers.len());
Expand Down Expand Up @@ -382,7 +382,7 @@ impl Processor for Parquet2DeserializeTransform {
self.add_block(block)?;
}
}
(ParquetPart::SmallFiles(p), Parquet2PartData::SmallFiles(buffers)) => {
(ParquetPart::ParquetFiles(p), Parquet2PartData::SmallFiles(buffers)) => {
let blocks = self.process_small_files(p, buffers)?;
self.add_block(DataBlock::concat(&blocks)?)?;
}
Expand Down
14 changes: 7 additions & 7 deletions src/query/storages/parquet/src/parquet_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::parquet_rs::ParquetRSRowGroupPart;
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
pub enum ParquetPart {
Parquet2RowGroup(Parquet2RowGroupPart),
SmallFiles(ParquetSmallFilesPart),
ParquetFiles(ParquetFilesPart),
ParquetRSRowGroup(ParquetRSRowGroupPart),
}

Expand All @@ -43,27 +43,27 @@ impl ParquetPart {
pub fn uncompressed_size(&self) -> u64 {
match self {
ParquetPart::Parquet2RowGroup(r) => r.uncompressed_size(),
ParquetPart::SmallFiles(p) => p.uncompressed_size(),
ParquetPart::ParquetFiles(p) => p.uncompressed_size(),
ParquetPart::ParquetRSRowGroup(p) => p.uncompressed_size(),
}
}

pub fn compressed_size(&self) -> u64 {
match self {
ParquetPart::Parquet2RowGroup(r) => r.compressed_size(),
ParquetPart::SmallFiles(p) => p.compressed_size(),
ParquetPart::ParquetFiles(p) => p.compressed_size(),
ParquetPart::ParquetRSRowGroup(p) => p.compressed_size(),
}
}
}

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct ParquetSmallFilesPart {
pub struct ParquetFilesPart {
pub files: Vec<(String, u64)>,
pub estimated_uncompressed_size: u64,
}

impl ParquetSmallFilesPart {
impl ParquetFilesPart {
pub fn compressed_size(&self) -> u64 {
self.files.iter().map(|(_, s)| *s).sum()
}
Expand All @@ -87,7 +87,7 @@ impl PartInfo for ParquetPart {
fn hash(&self) -> u64 {
let path = match self {
ParquetPart::Parquet2RowGroup(r) => &r.location,
ParquetPart::SmallFiles(p) => &p.files[0].0,
ParquetPart::ParquetFiles(p) => &p.files[0].0,
ParquetPart::ParquetRSRowGroup(p) => &p.location,
};
let mut s = DefaultHasher::new();
Expand Down Expand Up @@ -135,7 +135,7 @@ pub(crate) fn collect_small_file_parts(
let mut make_small_files_part = |files: Vec<(String, u64)>, part_size| {
let estimated_uncompressed_size = (part_size as f64 / max_compression_ratio) as u64;
num_small_files -= files.len();
partitions.push(ParquetPart::SmallFiles(ParquetSmallFilesPart {
partitions.push(ParquetPart::ParquetFiles(ParquetFilesPart {
files,
estimated_uncompressed_size,
}));
Expand Down
33 changes: 20 additions & 13 deletions src/query/storages/parquet/src/parquet_rs/parquet_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct ParquetRSReader {
/// Projected field levels.
field_levels: FieldLevels,

pruner: ParquetRSPruner,
pruner: Option<ParquetRSPruner>,

// Options
need_page_index: bool,
Expand All @@ -98,6 +98,7 @@ impl ParquetRSReader {
arrow_schema: &arrow_schema::Schema,
plan: &DataSourcePlan,
options: ParquetReadOptions,
create_pruner: bool, // If prune row groups and pages before reading.
) -> Result<Self> {
let mut output_projection =
PushDownInfo::projection_of_push_downs(&table_schema, &plan.push_downs);
Expand Down Expand Up @@ -129,13 +130,17 @@ impl ParquetRSReader {
parquet_to_arrow_schema_by_columns(&schema_desc, projection.clone(), None)?;
let output_schema = to_arrow_schema(&output_projection.project_schema(&table_schema));
let field_paths = compute_output_field_paths(&output_schema, &batch_schema)?;
// Build pruner to prune row groups and pages(TODO).
let pruner = ParquetRSPruner::try_create(
ctx.get_function_context()?,
table_schema,
&plan.push_downs,
options,
)?;

let pruner = if create_pruner {
Some(ParquetRSPruner::try_create(
ctx.get_function_context()?,
table_schema,
&plan.push_downs,
options,
)?)
} else {
None
};

let batch_size = ctx.get_settings().get_max_block_size()? as usize;
let field_levels = parquet_to_arrow_field_levels(&schema_desc, projection.clone(), None)?;
Expand Down Expand Up @@ -174,12 +179,14 @@ impl ParquetRSReader {
// Prune row groups.
let file_meta = builder.metadata();

let selected_row_groups = self.pruner.prune_row_groups(file_meta)?;
let row_selection = self.pruner.prune_pages(file_meta, &selected_row_groups)?;
if let Some(pruner) = &self.pruner {
let selected_row_groups = pruner.prune_row_groups(file_meta)?;
let row_selection = pruner.prune_pages(file_meta, &selected_row_groups)?;

builder = builder.with_row_groups(selected_row_groups);
if let Some(row_selection) = row_selection {
builder = builder.with_row_selection(row_selection);
builder = builder.with_row_groups(selected_row_groups);
if let Some(row_selection) = row_selection {
builder = builder.with_row_selection(row_selection);
}
}

if let Some(predicate) = self.predicate.as_ref() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl ParquetRSTable {
&self.arrow_schema,
plan,
self.read_options,
false,
)?);

// TODO(parquet):
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/parquet/src/parquet_rs/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,

// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Expand Down

0 comments on commit 59d81c1

Please sign in to comment.