-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Related to #17076
tl;dr: file sources are applying projections in various areas. My proposal is to centralize this into one area. This way, we can avoid passing FileScanConfig in various places`
Hi, I'm curious about the circular relationship between FileScanConfig and FileSource. Specifically, how various file sources apply projection when creating a file opener.
When we create a file stream in FileScanConfig, we do the following:
datafusion/datafusion/datasource/src/file_scan_config.rs
Lines 507 to 514 in 173989c
| let source = self | |
| .file_source | |
| .with_batch_size(batch_size) | |
| .with_projection(self); | |
| let opener = source.create_file_opener(object_store, self, partition); | |
| let stream = FileStream::new(self, partition, opener, source.metrics())?; |
FileScanConfig has an inner field of FileSource, and a FileSource creates a file opener. In both of these steps, we pass in FileScanConfig, which feels a bit weird to me. (1. with_projection(self), and 2. create_file_opener(..., self, ...).
Looking through the various implementation of with_projection, we see some file sources do nothing other than do a deep clone:
ParquetSource
datafusion/datafusion/datasource-parquet/src/source.rs
Lines 575 to 577 in 173989c
| fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { | |
| Arc::new(Self { ..self.clone() }) | |
| } |
JsonSource
datafusion/datafusion/datasource-json/src/source.rs
Lines 135 to 137 in 173989c
| fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { | |
| Arc::new(Self { ..self.clone() }) | |
| } |
It's not that they don't need to project columns, but the actual projection for these file sources occur in create_file_opener, hence why we pass in the FileScanConfig again.
ParquetSource
datafusion/datafusion/datasource-parquet/src/source.rs
Lines 461 to 470 in 173989c
| impl FileSource for ParquetSource { | |
| fn create_file_opener( | |
| &self, | |
| object_store: Arc<dyn ObjectStore>, | |
| base_config: &FileScanConfig, | |
| partition: usize, | |
| ) -> Arc<dyn FileOpener> { | |
| let projection = base_config | |
| .file_column_projection_indices() | |
| .unwrap_or_else(|| (0..base_config.file_schema.fields().len()).collect()); |
JsonSource
datafusion/datafusion/datasource-json/src/source.rs
Lines 99 to 114 in 173989c
| impl FileSource for JsonSource { | |
| fn create_file_opener( | |
| &self, | |
| object_store: Arc<dyn ObjectStore>, | |
| base_config: &FileScanConfig, | |
| _partition: usize, | |
| ) -> Arc<dyn FileOpener> { | |
| Arc::new(JsonOpener { | |
| batch_size: self | |
| .batch_size | |
| .expect("Batch size must set before creating opener"), | |
| projected_schema: base_config.projected_file_schema(), | |
| file_compression_type: base_config.file_compression_type, | |
| object_store, | |
| }) | |
| } |
For the other file sources, the projection occurs within with_projection:
CsvSource
datafusion/datafusion/datasource-csv/src/source.rs
Lines 260 to 264 in 173989c
| fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> { | |
| let mut conf = self.clone(); | |
| conf.file_projection = config.file_column_projection_indices(); | |
| Arc::new(conf) | |
| } |
AvroSource
datafusion/datafusion/datasource-avro/src/source.rs
Lines 98 to 102 in 173989c
| fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> { | |
| let mut conf = self.clone(); | |
| conf.projection = config.projected_file_column_names(); | |
| Arc::new(conf) | |
| } |
It would be nice to unify this logic, and make the design/relationship between FileSource and FileScanConfig much simpler. One idea I had was to move the explicit projection into the create_file_opener impls, removing the need to call .with_projection. WIth #17076, we'd just directly call file_source.create_file_opener.