diff --git a/.gitignore b/.gitignore index 83ba82536a..775ba55c13 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ target .DS_Store .idea/ -.vscode \ No newline at end of file +.vscode +.dir-locals.el diff --git a/Cargo.lock b/Cargo.lock index f00f996ece..a626b49986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,7 +447,7 @@ dependencies = [ [[package]] name = "arrow_util" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "ahash 0.8.3", "arrow 38.0.0", @@ -1823,7 +1823,7 @@ dependencies = [ [[package]] name = "datafusion" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "ahash 0.8.3", "arrow 38.0.0", @@ -1872,7 +1872,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "arrow 38.0.0", "arrow-array", @@ -1886,7 +1886,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "dashmap 5.4.0", "datafusion-common", @@ -1903,7 +1903,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "ahash 0.8.3", "arrow 38.0.0", @@ -1914,7 +1914,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "arrow 38.0.0", "async-trait", @@ -1931,7 +1931,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "ahash 0.8.3", "arrow 38.0.0", @@ -1963,7 +1963,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "arrow 38.0.0", "chrono", @@ -1977,7 +1977,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "arrow 38.0.0", "datafusion-common", @@ -1988,7 +1988,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "23.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" +source = "git+https://github.com/jiacai2050/arrow-datafusion.git?rev=13314c37020b90246db9b80f8294370c06e61018#13314c37020b90246db9b80f8294370c06e61018" dependencies = [ "arrow 38.0.0", "arrow-schema", @@ -2001,7 +2001,7 @@ dependencies = [ [[package]] name = "datafusion_util" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "async-trait", "datafusion", @@ -2536,7 +2536,7 @@ checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" [[package]] name = "generated_types" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "pbjson", "pbjson-build", @@ -2929,7 +2929,7 @@ dependencies = [ [[package]] name = "influxdb_influxql_parser" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "chrono", "chrono-tz", @@ -2991,7 +2991,7 @@ dependencies = [ [[package]] name = "iox_query" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "arrow 38.0.0", "arrow_util", @@ -3015,7 +3015,7 @@ dependencies = [ [[package]] name = "iox_query_influxql" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "arrow 38.0.0", "chrono", @@ -4068,7 +4068,7 @@ dependencies = [ [[package]] name = "observability_deps" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "tracing", ] @@ -4964,7 +4964,7 @@ dependencies = [ [[package]] name = "query_functions" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "arrow 38.0.0", "chrono", @@ -5571,7 +5571,7 @@ dependencies = [ [[package]] name = "schema" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "arrow 38.0.0", "hashbrown 0.13.2", @@ -6380,7 +6380,7 @@ dependencies = [ [[package]] name = "test_helpers" version = "0.1.0" -source = "git+https://github.com/CeresDB/influxql#4cef2d2fb84fc77af1a7a88ad4c978a5b344ad23" +source = "git+https://github.com/CeresDB/influxql?rev=efbc589#efbc589fb4e884e4ce057f066e63183f02a99c51" dependencies = [ "dotenvy", "observability_deps", diff --git a/Cargo.toml b/Cargo.toml index 0aa7d5d377..054ba9cf78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,8 +76,8 @@ cluster = { path = "cluster" } criterion = "0.3" common_types = { path = "common_types" } common_util = { path = "common_util" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" } +datafusion = { git = "https://github.com/jiacai2050/arrow-datafusion.git", rev = "13314c37020b90246db9b80f8294370c06e61018" } +datafusion-proto = { git = "https://github.com/jiacai2050/arrow-datafusion.git", rev = "13314c37020b90246db9b80f8294370c06e61018" } df_operator = { path = "df_operator" } etcd-client = "0.10.3" env_logger = "0.6" @@ -89,10 +89,10 @@ lazy_static = "1.4.0" log = "0.4" logger = { path = "components/logger" } lru = "0.7.6" -influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", package = "iox_query_influxql" } -influxql-parser = { git = "https://github.com/CeresDB/influxql", package = "influxdb_influxql_parser" } -influxql-query = { git = "https://github.com/CeresDB/influxql", package = "iox_query" } -influxql-schema = { git = "https://github.com/CeresDB/influxql", package = "schema" } +influxql-logical-planner = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "iox_query_influxql" } +influxql-parser = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "influxdb_influxql_parser" } +influxql-query = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "iox_query" } +influxql-schema = { git = "https://github.com/CeresDB/influxql", rev = "efbc589", package = "schema" } interpreters = { path = "interpreters" } itertools = "0.10.5" meta_client = { path = "meta_client" } diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 2764c93ce9..dac48bff44 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -22,11 +22,22 @@ use common_util::{ runtime::{AbortOnDropMany, JoinHandle, Runtime}, time::InstantExt, }; +use datafusion::{ + common::ToDFSchema, + physical_expr::{create_physical_expr, execution_props::ExecutionProps}, + physical_plan::{ + file_format::{parquet::page_filter::PagePruningPredicate, ParquetFileMetrics}, + metrics::ExecutionPlanMetricsSet, + }, +}; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask}, + arrow::{ + arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, + ProjectionMask, + }, file::metadata::RowGroupMetaData, }; use parquet_ext::meta_data::ChunkReader; @@ -71,6 +82,7 @@ pub struct Reader<'a> { /// Options for `read_parallelly` metrics: Metrics, + df_plan_metrics: ExecutionPlanMetricsSet, } #[derive(Default, Debug, Clone, TraceMetricWhenDrop)] @@ -94,7 +106,7 @@ impl<'a> Reader<'a> { metrics_collector: Option, ) -> Self { let store = store_picker.pick_by_freq(options.frequency); - + let df_plan_metrics = ExecutionPlanMetricsSet::new(); let metrics = Metrics { metrics_collector, ..Default::default() @@ -112,6 +124,7 @@ impl<'a> Reader<'a> { meta_data: None, row_projector: None, metrics, + df_plan_metrics, } } @@ -182,6 +195,36 @@ impl<'a> Reader<'a> { suggested.min(num_row_groups).max(1) } + fn build_row_selection( + &self, + arrow_schema: SchemaRef, + row_groups: &[usize], + file_metadata: &parquet_ext::ParquetMetaData, + ) -> Result> { + // TODO: remove fixed partition + let partition = 0; + let exprs = datafusion::optimizer::utils::conjunction(self.predicate.exprs().to_vec()); + let exprs = match exprs { + Some(exprs) => exprs, + None => return Ok(None), + }; + + let df_schema = arrow_schema + .clone() + .to_dfschema() + .context(DataFusionError)?; + let physical_expr = + create_physical_expr(&exprs, &df_schema, &arrow_schema, &ExecutionProps::new()) + .context(DataFusionError)?; + let page_predicate = PagePruningPredicate::try_new(&physical_expr, arrow_schema.clone()) + .context(DataFusionError)?; + + let metrics = ParquetFileMetrics::new(partition, self.path.as_ref(), &self.df_plan_metrics); + page_predicate + .prune(row_groups, file_metadata, &metrics) + .context(DataFusionError) + } + async fn fetch_record_batch_streams( &mut self, suggested_parallelism: usize, @@ -190,10 +233,10 @@ impl<'a> Reader<'a> { let meta_data = self.meta_data.as_ref().unwrap(); let row_projector = self.row_projector.as_ref().unwrap(); - + let arrow_schema = meta_data.custom().schema.to_arrow_schema_ref(); // Get target row groups. let target_row_groups = self.prune_row_groups( - meta_data.custom().schema.to_arrow_schema_ref(), + arrow_schema.clone(), meta_data.parquet().row_groups(), meta_data.custom().parquet_filter.as_ref(), )?; @@ -226,6 +269,7 @@ impl<'a> Reader<'a> { target_row_group_chunks[chunk_idx].push(row_group); } + let parquet_metadata = meta_data.parquet(); let proj_mask = ProjectionMask::leaves( meta_data.parquet().file_metadata().schema_descr(), row_projector.existed_source_projection().iter().copied(), @@ -239,9 +283,15 @@ impl<'a> Reader<'a> { for chunk in target_row_group_chunks { let object_store_reader = ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); - let builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) + let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; + let row_selection = + self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; + if let Some(selection) = row_selection { + builder = builder.with_row_selection(selection); + }; + let stream = builder .with_batch_size(self.num_rows_per_row_group) .with_row_groups(chunk) @@ -353,6 +403,16 @@ impl<'a> Reader<'a> { } } +impl<'a> Drop for Reader<'a> { + fn drop(&mut self) { + debug!( + "Parquet reader dropped, path:{:?}, df_plan_metrics:{}", + self.path, + self.df_plan_metrics.clone_inner().to_string() + ); + } +} + #[derive(Clone)] struct ObjectStoreReader { storage: ObjectStoreRef, diff --git a/analytic_engine/src/sst/parquet/row_group_pruner.rs b/analytic_engine/src/sst/parquet/row_group_pruner.rs index 51c270c88b..d48d872917 100644 --- a/analytic_engine/src/sst/parquet/row_group_pruner.rs +++ b/analytic_engine/src/sst/parquet/row_group_pruner.rs @@ -51,6 +51,9 @@ pub struct RowGroupPruner<'a> { } impl<'a> RowGroupPruner<'a> { + // TODO: DataFusion already change predicates to PhyscialExpr, we should keep up + // with upstream. + // https://github.com/apache/arrow-datafusion/issues/4695 pub fn try_new( schema: &'a SchemaRef, row_groups: &'a [RowGroupMetaData],