Skip to content

Commit 2b92b1d

Browse files
Move file source projected statistics into config
1 parent 289f391 commit 2b92b1d

File tree

15 files changed

+57
-135
lines changed

15 files changed

+57
-135
lines changed

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ pub(crate) mod test_util {
8989
format.file_source(),
9090
)
9191
.with_file_groups(file_groups)
92-
.with_statistics(statistics)
92+
.with_file_source_projected_statistics(statistics)
9393
.with_projection(projection)
9494
.with_limit(limit)
9595
.build(),

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1247,7 +1247,7 @@ impl TableProvider for ListingTable {
12471247
)
12481248
.with_file_groups(partitioned_file_lists)
12491249
.with_constraints(self.constraints.clone())
1250-
.with_statistics(statistics)
1250+
.with_file_source_projected_statistics(statistics)
12511251
.with_projection(projection.cloned())
12521252
.with_limit(limit)
12531253
.with_output_ordering(output_ordering)

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
2626
use arrow::buffer::Buffer;
2727
use arrow::datatypes::SchemaRef;
2828
use arrow_ipc::reader::FileDecoder;
29-
use datafusion_common::Statistics;
3029
use datafusion_datasource::file::FileSource;
3130
use datafusion_datasource::file_scan_config::FileScanConfig;
3231
use datafusion_datasource::PartitionedFile;
@@ -41,7 +40,6 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
4140
#[derive(Clone, Default)]
4241
pub struct ArrowSource {
4342
metrics: ExecutionPlanMetricsSet,
44-
projected_statistics: Option<Statistics>,
4543
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4644
}
4745

@@ -75,11 +73,6 @@ impl FileSource for ArrowSource {
7573
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
7674
Arc::new(Self { ..self.clone() })
7775
}
78-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
79-
let mut conf = self.clone();
80-
conf.projected_statistics = Some(statistics);
81-
Arc::new(conf)
82-
}
8376

8477
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
8578
Arc::new(Self { ..self.clone() })
@@ -89,13 +82,6 @@ impl FileSource for ArrowSource {
8982
&self.metrics
9083
}
9184

92-
fn file_source_statistics(&self) -> Result<Statistics> {
93-
let statistics = &self.projected_statistics;
94-
Ok(statistics
95-
.clone()
96-
.expect("projected_statistics must be set"))
97-
}
98-
9985
fn file_type(&self) -> &str {
10086
"arrow"
10187
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,25 +167,10 @@ impl FileSource for TestSource {
167167
})
168168
}
169169

170-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
171-
Arc::new(TestSource {
172-
statistics: Some(statistics),
173-
..self.clone()
174-
})
175-
}
176-
177170
fn metrics(&self) -> &ExecutionPlanMetricsSet {
178171
&self.metrics
179172
}
180173

181-
fn file_source_statistics(&self) -> Result<Statistics> {
182-
Ok(self
183-
.statistics
184-
.as_ref()
185-
.expect("statistics not set")
186-
.clone())
187-
}
188-
189174
fn file_type(&self) -> &str {
190175
"test"
191176
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ pub(crate) fn parquet_exec_with_stats(file_size: u64) -> Arc<DataSourceExec> {
131131
Arc::new(ParquetSource::new(Default::default())),
132132
)
133133
.with_file(PartitionedFile::new("x".to_string(), file_size))
134-
.with_statistics(statistics)
134+
.with_file_source_projected_statistics(statistics)
135135
.build();
136136

137137
assert_eq!(
138138
config
139139
.file_source
140-
.file_source_statistics()
140+
.file_source_statistics(&config)
141141
.unwrap()
142142
.num_rows,
143143
Precision::Inexact(10000)

datafusion/datasource-avro/src/source.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use crate::avro_to_arrow::Reader as AvroReader;
2424

2525
use arrow::datatypes::SchemaRef;
2626
use datafusion_common::error::Result;
27-
use datafusion_common::Statistics;
2827
use datafusion_datasource::file::FileSource;
2928
use datafusion_datasource::file_scan_config::FileScanConfig;
3029
use datafusion_datasource::file_stream::FileOpener;
@@ -41,7 +40,6 @@ pub struct AvroSource {
4140
batch_size: Option<usize>,
4241
projection: Option<Vec<String>>,
4342
metrics: ExecutionPlanMetricsSet,
44-
projected_statistics: Option<Statistics>,
4543
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4644
}
4745

@@ -89,11 +87,6 @@ impl FileSource for AvroSource {
8987
conf.schema = Some(schema);
9088
Arc::new(conf)
9189
}
92-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
93-
let mut conf = self.clone();
94-
conf.projected_statistics = Some(statistics);
95-
Arc::new(conf)
96-
}
9790

9891
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
9992
let mut conf = self.clone();
@@ -105,13 +98,6 @@ impl FileSource for AvroSource {
10598
&self.metrics
10699
}
107100

108-
fn file_source_statistics(&self) -> Result<Statistics> {
109-
let statistics = &self.projected_statistics;
110-
Ok(statistics
111-
.clone()
112-
.expect("projected_statistics must be set"))
113-
}
114-
115101
fn file_type(&self) -> &str {
116102
"avro"
117103
}

datafusion/datasource-csv/src/source.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_datasource::{
3535

3636
use arrow::csv;
3737
use arrow::datatypes::SchemaRef;
38-
use datafusion_common::{DataFusionError, Result, Statistics};
38+
use datafusion_common::{DataFusionError, Result};
3939
use datafusion_common_runtime::JoinSet;
4040
use datafusion_datasource::file::FileSource;
4141
use datafusion_datasource::file_scan_config::FileScanConfig;
@@ -92,7 +92,6 @@ pub struct CsvSource {
9292
escape: Option<u8>,
9393
comment: Option<u8>,
9494
metrics: ExecutionPlanMetricsSet,
95-
projected_statistics: Option<Statistics>,
9695
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
9796
}
9897

@@ -251,12 +250,6 @@ impl FileSource for CsvSource {
251250
Arc::new(conf)
252251
}
253252

254-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
255-
let mut conf = self.clone();
256-
conf.projected_statistics = Some(statistics);
257-
Arc::new(conf)
258-
}
259-
260253
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
261254
let mut conf = self.clone();
262255
conf.file_projection = config.file_column_projection_indices();
@@ -266,12 +259,7 @@ impl FileSource for CsvSource {
266259
fn metrics(&self) -> &ExecutionPlanMetricsSet {
267260
&self.metrics
268261
}
269-
fn file_source_statistics(&self) -> Result<Statistics> {
270-
let statistics = &self.projected_statistics;
271-
Ok(statistics
272-
.clone()
273-
.expect("projected_statistics must be set"))
274-
}
262+
275263
fn file_type(&self) -> &str {
276264
"csv"
277265
}

datafusion/datasource-json/src/source.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3838

3939
use arrow::json::ReaderBuilder;
4040
use arrow::{datatypes::SchemaRef, json};
41-
use datafusion_common::Statistics;
4241
use datafusion_datasource::file::FileSource;
4342
use datafusion_datasource::file_scan_config::FileScanConfig;
4443
use datafusion_execution::TaskContext;
@@ -79,7 +78,6 @@ impl JsonOpener {
7978
pub struct JsonSource {
8079
batch_size: Option<usize>,
8180
metrics: ExecutionPlanMetricsSet,
82-
projected_statistics: Option<Statistics>,
8381
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
8482
}
8583

@@ -126,11 +124,6 @@ impl FileSource for JsonSource {
126124
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
127125
Arc::new(Self { ..self.clone() })
128126
}
129-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
130-
let mut conf = self.clone();
131-
conf.projected_statistics = Some(statistics);
132-
Arc::new(conf)
133-
}
134127

135128
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
136129
Arc::new(Self { ..self.clone() })
@@ -140,13 +133,6 @@ impl FileSource for JsonSource {
140133
&self.metrics
141134
}
142135

143-
fn file_source_statistics(&self) -> Result<Statistics> {
144-
let statistics = &self.projected_statistics;
145-
Ok(statistics
146-
.clone()
147-
.expect("projected_statistics must be set to call"))
148-
}
149-
150136
fn file_type(&self) -> &str {
151137
"json"
152138
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ pub struct ParquetSource {
285285
pub(crate) batch_size: Option<usize>,
286286
/// Optional hint for the size of the parquet metadata
287287
pub(crate) metadata_size_hint: Option<usize>,
288-
pub(crate) projected_statistics: Option<Statistics>,
289288
#[cfg(feature = "parquet_encryption")]
290289
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
291290
}
@@ -600,12 +599,6 @@ impl FileSource for ParquetSource {
600599
})
601600
}
602601

603-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
604-
let mut conf = self.clone();
605-
conf.projected_statistics = Some(statistics);
606-
Arc::new(conf)
607-
}
608-
609602
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
610603
Arc::new(Self { ..self.clone() })
611604
}
@@ -614,11 +607,12 @@ impl FileSource for ParquetSource {
614607
&self.metrics
615608
}
616609

617-
fn file_source_statistics(&self) -> datafusion_common::Result<Statistics> {
618-
let statistics = &self.projected_statistics;
619-
let statistics = statistics
620-
.clone()
621-
.expect("projected_statistics must be set");
610+
fn file_source_statistics(
611+
&self,
612+
config: &FileScanConfig,
613+
) -> datafusion_common::Result<Statistics> {
614+
let statistics = config.file_source_projected_statistics.clone();
615+
622616
// When filters are pushed down, we have no way of knowing the exact statistics.
623617
// Note that pruning predicate is also a kind of filter pushdown.
624618
// (bloom filters use `pruning_predicate` too).

datafusion/datasource/src/file.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,12 @@ pub trait FileSource: Send + Sync {
6767
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
6868
/// Initialize new instance with projection information
6969
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
70-
/// Initialize new instance with projected statistics
71-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
7270
/// Return execution plan metrics
7371
fn metrics(&self) -> &ExecutionPlanMetricsSet;
7472
/// Return projected statistics
75-
fn file_source_statistics(&self) -> Result<Statistics>;
73+
fn file_source_statistics(&self, config: &FileScanConfig) -> Result<Statistics> {
74+
Ok(config.file_source_projected_statistics.clone())
75+
}
7676
/// String representation of file source such as "csv", "json", "parquet"
7777
fn file_type(&self) -> &str;
7878
/// Format FileType specific information

0 commit comments

Comments
 (0)