Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: read parquet in row group level. #12569

Merged
merged 12 commits into from
Aug 25, 2023
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/storages/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false

[dependencies]
common-arrow = { path = "../../../common/arrow" }
common-base = { path = "../../../common/base" }
common-catalog = { path = "../../catalog" }
common-exception = { path = "../../../common/exception" }
common-expression = { path = "../../expression" }
Expand Down
11 changes: 6 additions & 5 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableMeta;
use common_pipeline_core::Pipeline;
use common_storage::DataOperator;
use common_storages_parquet::ParquetFilesPart;
use common_storages_parquet::ParquetPart;
use common_storages_parquet::ParquetRSFilePart;
use common_storages_parquet::ParquetRSReader;
use storages_common_pruner::RangePrunerCreator;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -170,6 +170,7 @@ impl IcebergTable {
&arrow_schema,
plan,
ParquetReadOptions::default(),
true,
)?);

// TODO: we need to support top_k.
Expand Down Expand Up @@ -232,10 +233,10 @@ impl IcebergTable {
.rel_path(&v.file_path)
.expect("file path must be rel to table");
Ok(Arc::new(
Box::new(IcebergPartInfo::Parquet(ParquetPart::ParquetRSFile(
ParquetRSFilePart {
location,
file_size: v.file_size_in_bytes as u64,
Box::new(IcebergPartInfo::Parquet(ParquetPart::ParquetFiles(
ParquetFilesPart {
files: vec![(location, v.file_size_in_bytes as u64)],
estimated_uncompressed_size: v.file_size_in_bytes as u64, // This field is not used here.
},
))) as Box<dyn PartInfo>,
))
Expand Down
22 changes: 17 additions & 5 deletions src/query/storages/iceberg/src/table_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::any::Any;
use std::sync::Arc;

use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
Expand All @@ -33,9 +35,11 @@ use parquet::arrow::async_reader::ParquetRecordBatchStream;
use crate::partition::IcebergPartInfo;

pub struct IcebergTableSource {
// Source processor related fields.
output: Arc<OutputPort>,
scan_progress: Arc<Progress>,
// Used for event transforming.
ctx: Arc<dyn TableContext>,
output: Arc<OutputPort>,
generated_data: Option<DataBlock>,
is_finished: bool,

Expand All @@ -52,9 +56,11 @@ impl IcebergTableSource {
output_schema: DataSchemaRef,
parquet_reader: Arc<ParquetRSReader>,
) -> Result<ProcessorPtr> {
let scan_progress = ctx.get_scan_progress();
Ok(ProcessorPtr::create(Box::new(IcebergTableSource {
ctx,
output,
scan_progress,
ctx,
parquet_reader,
output_schema,
stream: None,
Expand Down Expand Up @@ -91,6 +97,11 @@ impl Processor for IcebergTableSource {
match self.generated_data.take() {
None => Ok(Event::Async),
Some(data_block) => {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
};
self.scan_progress.incr(&progress_values);
self.output.push_data(Ok(data_block));
Ok(Event::NeedConsume)
}
Expand All @@ -102,7 +113,7 @@ impl Processor for IcebergTableSource {
if let Some(mut stream) = self.stream.take() {
if let Some(block) = self
.parquet_reader
.read_block(&mut stream)
.read_block_from_stream(&mut stream)
.await?
.map(|b| check_block_schema(&self.output_schema, b))
.transpose()?
Expand All @@ -115,10 +126,11 @@ impl Processor for IcebergTableSource {
// And we should try to build another stream (in next event loop).
} else if let Some(part) = self.ctx.get_partition() {
match IcebergPartInfo::from_part(&part)? {
IcebergPartInfo::Parquet(ParquetPart::ParquetRSFile(file)) => {
IcebergPartInfo::Parquet(ParquetPart::ParquetFiles(files)) => {
assert_eq!(files.files.len(), 1);
let stream = self
.parquet_reader
.prepare_data_stream(self.ctx.clone(), &file.location)
.prepare_data_stream(&files.files[0].0)
.await?;
self.stream = Some(stream);
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ arrow-buffer = "45.0.0"
arrow-schema = "45.0.0"
async-backtrace = { workspace = true }
async-trait = { version = "0.1.57", package = "async-trait-fn" }
bytes = "1"
chrono = { workspace = true }
ethnum = { workspace = true }
futures = "0.3.24"
log = { workspace = true }
opendal = { workspace = true }
parquet = { version = "45.0.0", features = ["async"] }
serde = { workspace = true }
thrift = "0.17.0"
typetag = "0.2.3"

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ mod parquet_rs;
mod utils;

pub use parquet2::Parquet2Table;
pub use parquet_part::ParquetFilesPart;
pub use parquet_part::ParquetPart;
pub use parquet_part::ParquetRSFilePart;
pub use parquet_rs::ParquetRSPruner;
pub use parquet_rs::ParquetRSReader;
pub use parquet_rs::ParquetRSRowGroupPart;
pub use parquet_rs::ParquetRSTable;
2 changes: 2 additions & 0 deletions src/query/storages/parquet/src/parquet2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

mod parquet_reader;
mod parquet_table;
mod partition;
mod processors;
mod projection;
mod pruning;
mod statistics;

pub use parquet_table::Parquet2Table;
pub use partition::Parquet2RowGroupPart;
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use log::debug;

use super::filter::FilterState;
use crate::parquet2::parquet_reader::Parquet2Reader;
use crate::parquet_part::ColumnMeta;
use crate::parquet2::partition::ColumnMeta;

impl Parquet2Reader {
/// The number of columns can be greater than 1 because the it may be a nested type.
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/parquet/src/parquet2/parquet_reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use super::Parquet2PartData;
use crate::parquet2::parquet_reader::deserialize::try_next_block;
use crate::parquet2::parquet_table::arrow_to_table_schema;
use crate::parquet2::projection::project_parquet_schema;
use crate::parquet_part::Parquet2RowGroupPart;
use crate::parquet2::Parquet2RowGroupPart;
use crate::ParquetPart;

/// The reader to parquet files with a projected schema.
Expand Down Expand Up @@ -274,7 +274,7 @@ impl Parquet2Reader {
ParquetPart::Parquet2RowGroup(part) => Ok(Parquet2PartData::RowGroup(
self.row_group_readers_from_blocking_io(part, &self.operator().blocking())?,
)),
ParquetPart::SmallFiles(part) => {
ParquetPart::ParquetFiles(part) => {
let op = self.operator().blocking();
let mut buffers = Vec::with_capacity(part.files.len());
for path in &part.files {
Expand All @@ -284,7 +284,7 @@ impl Parquet2Reader {
metrics_inc_copy_read_size_bytes(part.compressed_size());
Ok(Parquet2PartData::SmallFiles(buffers))
}
ParquetPart::ParquetRSFile(_) => unreachable!(),
ParquetPart::ParquetRSRowGroup(_) => unreachable!(),
}
}

Expand Down Expand Up @@ -330,7 +330,7 @@ impl Parquet2Reader {
let readers = readers.into_iter().collect::<IndexedReaders>();
Ok(Parquet2PartData::RowGroup(readers))
}
ParquetPart::SmallFiles(part) => {
ParquetPart::ParquetFiles(part) => {
let mut join_handlers = Vec::with_capacity(part.files.len());
for (path, _) in part.files.iter() {
let op = self.operator().clone();
Expand All @@ -348,7 +348,7 @@ impl Parquet2Reader {

Ok(Parquet2PartData::SmallFiles(buffers))
}
ParquetPart::ParquetRSFile(_) => unreachable!(),
ParquetPart::ParquetRSRowGroup(_) => unreachable!(),
}
}
}
56 changes: 56 additions & 0 deletions src/query/storages/parquet/src/parquet2/partition.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use common_arrow::parquet::compression::Compression;
use common_arrow::parquet::indexes::Interval;
use common_expression::FieldIndex;
use common_expression::Scalar;

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct ColumnMeta {
pub offset: u64,
pub length: u64,
pub num_values: i64,
pub compression: Compression,
pub uncompressed_size: u64,
pub min_max: Option<(Scalar, Scalar)>,

// if has dictionary, we can not push down predicate to deserialization.
pub has_dictionary: bool,
}

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq, Debug, Clone)]
pub struct Parquet2RowGroupPart {
pub location: String,
pub num_rows: usize,
pub column_metas: HashMap<FieldIndex, ColumnMeta>,
pub row_selection: Option<Vec<Interval>>,

pub sort_min_max: Option<(Scalar, Scalar)>,
}

impl Parquet2RowGroupPart {
pub fn uncompressed_size(&self) -> u64 {
self.column_metas
.values()
.map(|c| c.uncompressed_size)
.sum()
}

pub fn compressed_size(&self) -> u64 {
self.column_metas.values().map(|c| c.length).sum()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use crate::parquet2::parquet_reader::Parquet2PartData;
use crate::parquet2::parquet_reader::Parquet2Reader;
use crate::parquet2::parquet_table::Parquet2PrewhereInfo;
use crate::parquet2::pruning::PartitionPruner;
use crate::parquet_part::Parquet2RowGroupPart;
use crate::parquet2::Parquet2RowGroupPart;
use crate::parquet_part::ParquetFilesPart;
use crate::parquet_part::ParquetPart;
use crate::parquet_part::ParquetSmallFilesPart;

pub trait SmallFilePrunner: Send + Sync {
fn prune_one_file(
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Parquet2DeserializeTransform {

fn process_small_files(
&mut self,
part: &ParquetSmallFilesPart,
part: &ParquetFilesPart,
buffers: Vec<Vec<u8>>,
) -> Result<Vec<DataBlock>> {
assert_eq!(part.files.len(), buffers.len());
Expand Down Expand Up @@ -382,7 +382,7 @@ impl Processor for Parquet2DeserializeTransform {
self.add_block(block)?;
}
}
(ParquetPart::SmallFiles(p), Parquet2PartData::SmallFiles(buffers)) => {
(ParquetPart::ParquetFiles(p), Parquet2PartData::SmallFiles(buffers)) => {
let blocks = self.process_small_files(p, buffers)?;
self.add_block(DataBlock::concat(&blocks)?)?;
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/parquet/src/parquet2/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ use opendal::Operator;
use storages_common_pruner::RangePruner;
use storages_common_pruner::RangePrunerCreator;

use super::partition::ColumnMeta;
use super::Parquet2RowGroupPart;
use crate::parquet2::statistics::collect_row_group_stats;
use crate::parquet2::statistics::BatchStatistics;
use crate::parquet_part::collect_small_file_parts;
use crate::parquet_part::ColumnMeta;
use crate::parquet_part::Parquet2RowGroupPart;
use crate::parquet_part::ParquetPart;

/// Prune parquet row groups and pages.
Expand Down
Loading