Skip to content

Commit

Permalink
Merge branch 'main' into filter_card
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 16, 2023
2 parents 6557620 + b68855d commit e5c8770
Show file tree
Hide file tree
Showing 21 changed files with 1,139 additions and 299 deletions.
34 changes: 27 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ rpath = false
# If there are dependencies that need patching, they can be listed below.
# For example:
# arrow-format = { git = "https://github.com/datafuse-extras/arrow-format", rev = "78dacc1" }

arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "211be21" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "fb08b72" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "ed0e1ff" }
limits-rs = { git = "https://github.com/datafuse-extras/limits-rs", rev = "abfcf7b" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "bc49d03" }
2 changes: 1 addition & 1 deletion src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ arrow = { package = "arrow2", version = "0.15.0", default-features = false, feat
arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] }
futures = "0.3.24"
native = { package = "strawboat", version = "0.1.0" }
parquet2 = { version = "0.17.0", default_features = false }
parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types"] }

[dev-dependencies]
7 changes: 1 addition & 6 deletions src/query/storages/common/index/src/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::type_check::check_function;
use common_expression::types::nullable::NullableDomain;
Expand Down Expand Up @@ -46,7 +43,7 @@ pub struct RangeFilter {
}
impl RangeFilter {
pub fn try_create(
ctx: Arc<dyn TableContext>,
func_ctx: FunctionContext,
exprs: &[Expr<String>],
schema: TableSchemaRef,
) -> Result<Self> {
Expand All @@ -58,8 +55,6 @@ impl RangeFilter {
})
.unwrap();

let func_ctx = ctx.try_get_function_context()?;

let (new_expr, _) = ConstantFolder::fold(&conjunction, func_ctx, &BUILTIN_FUNCTIONS);

Ok(Self {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/common/pruner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ doctest = false
test = false

[dependencies]
common-catalog = { path = "../../../../query/catalog" }
common-exception = { path = "../../../../common/exception" }
common-expression = { path = "../../../expression" }

Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/common/pruner/src/range_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::TableSchemaRef;
use storages_common_index::RangeFilter;
use storages_common_table_meta::meta::StatisticsOfColumns;
Expand Down Expand Up @@ -62,13 +62,13 @@ impl RangePrunerCreator {
///
/// Note: the schema should be the schema of the table, not the schema of the input.
pub fn try_create<'a>(
ctx: &Arc<dyn TableContext>,
func_ctx: FunctionContext,
filter_expr: Option<&'a [Expr<String>]>,
schema: &'a TableSchemaRef,
) -> Result<Arc<dyn RangePruner + Send + Sync>> {
Ok(match filter_expr {
Some(exprs) if !exprs.is_empty() => {
let range_filter = RangeFilter::try_create(ctx.clone(), exprs, schema.clone())?;
let range_filter = RangeFilter::try_create(func_ctx, exprs, schema.clone())?;
match range_filter.try_eval_const() {
Ok(v) => {
if v {
Expand Down
6 changes: 5 additions & 1 deletion src/query/storages/fuse/src/pruning/pruning_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ impl BlockPruner {

// prepare the range filter.
// if filter_expression is none, an dummy pruner will be returned, which prunes nothing
let range_pruner = RangePrunerCreator::try_create(ctx, filter_exprs.as_deref(), &schema)?;
let range_pruner = RangePrunerCreator::try_create(
ctx.try_get_function_context()?,
filter_exprs.as_deref(),
&schema,
)?;

// prepare the filter.
// None will be returned, if filter is not applicable (e.g. unsuitable filter expression, index not available, etc.)
Expand Down
7 changes: 5 additions & 2 deletions src/query/storages/hive/hive/src/hive_partition_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ impl HivePartitionPruner {
}

pub fn prune(&self, partitions: Vec<String>) -> Result<Vec<String>> {
let range_filter =
RangeFilter::try_create(self.ctx.clone(), &self.filters, self.full_schema.clone())?;
let range_filter = RangeFilter::try_create(
self.ctx.try_get_function_context()?,
&self.filters,
self.full_schema.clone(),
)?;
let column_stats = self.get_column_stats(&partitions)?;
let mut filted_partitions = vec![];
for (idx, stats) in column_stats.into_iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/hive/hive/src/hive_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl HiveTable {
});
let range_filter = match filter_expressions {
Some(exprs) if !exprs.is_empty() => Some(RangeFilter::try_create(
ctx.clone(),
ctx.try_get_function_context()?,
&exprs,
self.table_info.schema(),
)?),
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ glob = "0.3.0"
opendal = { workspace = true }
serde = { workspace = true }
typetag = "0.2.3"

[dev-dependencies]
common-sql = { path = "../../sql" }
8 changes: 4 additions & 4 deletions src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_crate_dependencies)]

mod parquet_column;
mod parquet_part;
mod parquet_reader;
mod parquet_source;
mod pruning;
mod read_options;
mod statistics;
mod table_function;

pub use parquet_part::ParquetLocationPart;
pub use parquet_reader::ParquetReader;
pub use parquet_source::ParquetSource;
pub use read_options::ReadOptions;
pub use table_function::ParquetTable;
57 changes: 0 additions & 57 deletions src/query/storages/parquet/src/parquet_column.rs

This file was deleted.

48 changes: 5 additions & 43 deletions src/query/storages/parquet/src/parquet_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;

use common_arrow::parquet::compression::Compression as ParquetCompression;
use common_arrow::parquet::compression::Compression;
use common_arrow::parquet::indexes::Interval;
use common_catalog::plan::PartInfo;
use common_catalog::plan::PartInfoPtr;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -65,48 +66,6 @@ impl ParquetLocationPart {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub enum Compression {
Uncompressed,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
Lz4Raw,
}

impl From<Compression> for ParquetCompression {
fn from(value: Compression) -> Self {
match value {
Compression::Uncompressed => ParquetCompression::Uncompressed,
Compression::Snappy => ParquetCompression::Snappy,
Compression::Gzip => ParquetCompression::Gzip,
Compression::Lzo => ParquetCompression::Lzo,
Compression::Brotli => ParquetCompression::Brotli,
Compression::Lz4 => ParquetCompression::Lz4,
Compression::Zstd => ParquetCompression::Zstd,
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
}
}
}

impl From<ParquetCompression> for Compression {
fn from(value: ParquetCompression) -> Self {
match value {
ParquetCompression::Uncompressed => Compression::Uncompressed,
ParquetCompression::Snappy => Compression::Snappy,
ParquetCompression::Gzip => Compression::Gzip,
ParquetCompression::Lzo => Compression::Lzo,
ParquetCompression::Brotli => Compression::Brotli,
ParquetCompression::Lz4 => Compression::Lz4,
ParquetCompression::Zstd => Compression::Zstd,
ParquetCompression::Lz4Raw => Compression::Lz4Raw,
}
}
}

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct ColumnMeta {
pub offset: u64,
Expand All @@ -119,6 +78,7 @@ pub struct ParquetRowGroupPart {
pub location: String,
pub num_rows: usize,
pub column_metas: HashMap<usize, ColumnMeta>,
pub row_selection: Option<Vec<Interval>>,
}

#[typetag::serde(name = "parquet_row_group")]
Expand Down Expand Up @@ -146,11 +106,13 @@ impl ParquetRowGroupPart {
location: String,
num_rows: usize,
column_metas: HashMap<usize, ColumnMeta>,
row_selection: Option<Vec<Interval>>,
) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(ParquetRowGroupPart {
location,
num_rows,
column_metas,
row_selection,
}))
}

Expand Down
Loading

0 comments on commit e5c8770

Please sign in to comment.