Skip to content

Commit ab18711

Browse files
Move projection into file scan config
1 parent d6ff4cc commit ab18711

File tree

10 files changed

+42
-55
lines changed

10 files changed

+42
-55
lines changed

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ async fn csv_opener() -> Result<()> {
6666
.with_file(PartitionedFile::new(path.display().to_string(), 10))
6767
.build();
6868

69-
let config = CsvSource::new(true, b',', b'"')
70-
.with_comment(Some(b'#'))
71-
.with_projection(&scan_config);
69+
let config = CsvSource::new(true, b',', b'"').with_comment(Some(b'#'));
7270

7371
let opener = config.create_file_opener(object_store, &scan_config, 0);
7472

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,6 @@ impl FileSource for ArrowSource {
6565
self
6666
}
6767

68-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
69-
Arc::new(Self { ..self.clone() })
70-
}
71-
7268
fn metrics(&self) -> &ExecutionPlanMetricsSet {
7369
&self.metrics
7470
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ pub struct TestSource {
109109
predicate: Option<Arc<dyn PhysicalExpr>>,
110110
batches: Vec<RecordBatch>,
111111
metrics: ExecutionPlanMetricsSet,
112-
projection: Option<Vec<usize>>,
113112
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
114113
}
115114

@@ -135,21 +134,14 @@ impl FileSource for TestSource {
135134
batches: self.batches.clone(),
136135
batch_size: base_config.batch_size,
137136
schema: Some(Arc::clone(&base_config.file_schema)),
138-
projection: self.projection.clone(),
137+
projection: base_config.projection.clone(),
139138
})
140139
}
141140

142141
fn as_any(&self) -> &dyn Any {
143142
todo!("should not be called")
144143
}
145144

146-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
147-
Arc::new(TestSource {
148-
projection: config.projection.clone(),
149-
..self.clone()
150-
})
151-
}
152-
153145
fn metrics(&self) -> &ExecutionPlanMetricsSet {
154146
&self.metrics
155147
}

datafusion/datasource-avro/src/source.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use object_store::ObjectStore;
3636
/// AvroSource holds the extra configuration that is necessary for opening avro files
3737
#[derive(Clone, Default)]
3838
pub struct AvroSource {
39-
projection: Option<Vec<String>>,
4039
metrics: ExecutionPlanMetricsSet,
4140
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4241
}
@@ -52,12 +51,13 @@ impl AvroSource {
5251
reader: R,
5352
file_schema: SchemaRef,
5453
batch_size: Option<usize>,
54+
projected_file_column_names: Option<Vec<String>>,
5555
) -> Result<AvroReader<'static, R>> {
5656
AvroReader::try_new(
5757
reader,
5858
file_schema,
5959
batch_size.expect("Batch size must set before open"),
60-
self.projection.clone(),
60+
projected_file_column_names,
6161
)
6262
}
6363
}
@@ -74,19 +74,14 @@ impl FileSource for AvroSource {
7474
object_store,
7575
file_schema: base_config.file_schema.clone(),
7676
batch_size: base_config.batch_size,
77+
projected_file_column_names: base_config.projected_file_column_names(),
7778
})
7879
}
7980

8081
fn as_any(&self) -> &dyn Any {
8182
self
8283
}
8384

84-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
85-
let mut conf = self.clone();
86-
conf.projection = config.projected_file_column_names();
87-
Arc::new(conf)
88-
}
89-
9085
fn metrics(&self) -> &ExecutionPlanMetricsSet {
9186
&self.metrics
9287
}
@@ -136,6 +131,7 @@ mod private {
136131

137132
pub file_schema: SchemaRef,
138133
pub batch_size: Option<usize>,
134+
pub projected_file_column_names: Option<Vec<String>>,
139135
}
140136

141137
impl FileOpener for AvroOpener {
@@ -148,17 +144,28 @@ mod private {
148144
let object_store = Arc::clone(&self.object_store);
149145
let file_schema = Arc::clone(&self.file_schema);
150146
let batch_size = self.batch_size;
147+
let projected_file_names = self.projected_file_column_names.clone();
148+
151149
Ok(Box::pin(async move {
152150
let r = object_store.get(file_meta.location()).await?;
153151
match r.payload {
154152
GetResultPayload::File(file, _) => {
155-
let reader = source.open(file, file_schema, batch_size)?;
153+
let reader = source.open(
154+
file,
155+
file_schema,
156+
batch_size,
157+
projected_file_names,
158+
)?;
156159
Ok(futures::stream::iter(reader).boxed())
157160
}
158161
GetResultPayload::Stream(_) => {
159162
let bytes = r.bytes().await?;
160-
let reader =
161-
source.open(bytes.reader(), file_schema, batch_size)?;
163+
let reader = source.open(
164+
bytes.reader(),
165+
file_schema,
166+
batch_size,
167+
projected_file_names,
168+
)?;
162169
Ok(futures::stream::iter(reader).boxed())
163170
}
164171
}

datafusion/datasource-csv/src/source.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use tokio::io::AsyncWriteExt;
8282
/// ```
8383
#[derive(Debug, Clone, Default)]
8484
pub struct CsvSource {
85-
file_projection: Option<Vec<usize>>,
8685
pub(crate) has_header: bool,
8786
delimiter: u8,
8887
quote: u8,
@@ -161,14 +160,18 @@ impl CsvSource {
161160
reader: R,
162161
file_schema: SchemaRef,
163162
batch_size: Option<usize>,
163+
file_column_projection_indices: Option<Vec<usize>>,
164164
) -> Result<csv::Reader<R>> {
165-
Ok(self.builder(file_schema, batch_size).build(reader)?)
165+
Ok(self
166+
.builder(file_schema, batch_size, file_column_projection_indices)
167+
.build(reader)?)
166168
}
167169

168170
fn builder(
169171
&self,
170172
file_schema: SchemaRef,
171173
batch_size: Option<usize>,
174+
file_column_projection_indices: Option<Vec<usize>>,
172175
) -> csv::ReaderBuilder {
173176
let mut builder = csv::ReaderBuilder::new(Arc::clone(&file_schema))
174177
.with_delimiter(self.delimiter)
@@ -180,7 +183,7 @@ impl CsvSource {
180183
if let Some(terminator) = self.terminator {
181184
builder = builder.with_terminator(terminator);
182185
}
183-
if let Some(proj) = &self.file_projection {
186+
if let Some(proj) = &file_column_projection_indices {
184187
builder = builder.with_projection(proj.clone());
185188
}
186189
if let Some(escape) = self.escape {
@@ -199,8 +202,11 @@ pub struct CsvOpener {
199202
source: Arc<CsvSource>,
200203
file_compression_type: FileCompressionType,
201204
object_store: Arc<dyn ObjectStore>,
205+
206+
// matthew remove and use source.config
202207
file_schema: SchemaRef,
203208
batch_size: Option<usize>,
209+
file_column_projection_indices: Option<Vec<usize>>,
204210
}
205211

206212
impl CsvOpener {
@@ -213,13 +219,15 @@ impl CsvOpener {
213219
// matthew: delete this and use source.config
214220
file_schema: SchemaRef,
215221
batch_size: Option<usize>,
222+
file_column_projection_indices: Option<Vec<usize>>,
216223
) -> Self {
217224
Self {
218225
source,
219226
file_compression_type,
220227
object_store,
221228
file_schema,
222229
batch_size,
230+
file_column_projection_indices,
223231
}
224232
}
225233
}
@@ -243,19 +251,14 @@ impl FileSource for CsvSource {
243251
object_store,
244252
file_schema: base_config.file_schema.clone(),
245253
batch_size: base_config.batch_size,
254+
file_column_projection_indices: base_config.file_column_projection_indices(),
246255
})
247256
}
248257

249258
fn as_any(&self) -> &dyn Any {
250259
self
251260
}
252261

253-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
254-
let mut conf = self.clone();
255-
conf.file_projection = config.file_column_projection_indices();
256-
Arc::new(conf)
257-
}
258-
259262
fn metrics(&self) -> &ExecutionPlanMetricsSet {
260263
&self.metrics
261264
}
@@ -350,6 +353,7 @@ impl FileOpener for CsvOpener {
350353

351354
let file_schema = Arc::clone(&self.file_schema);
352355
let batch_size = self.batch_size;
356+
let file_column_projection_indices = self.file_column_projection_indices.clone();
353357

354358
Ok(Box::pin(async move {
355359
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
@@ -392,11 +396,14 @@ impl FileOpener for CsvOpener {
392396
decoder,
393397
file_schema,
394398
batch_size,
399+
file_column_projection_indices,
395400
)?)
396401
.boxed())
397402
}
398403
GetResultPayload::Stream(s) => {
399-
let decoder = source.builder(file_schema, batch_size).build_decoder();
404+
let decoder = source
405+
.builder(file_schema, batch_size, file_column_projection_indices)
406+
.build_decoder();
400407
let s = s.map_err(DataFusionError::from);
401408
let input = file_compression_type.convert_stream(s.boxed())?.fuse();
402409

datafusion/datasource-json/src/source.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,6 @@ impl FileSource for JsonSource {
114114
self
115115
}
116116

117-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
118-
Arc::new(Self { ..self.clone() })
119-
}
120-
121117
fn metrics(&self) -> &ExecutionPlanMetricsSet {
122118
&self.metrics
123119
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,6 @@ impl FileSource for ParquetSource {
580580
self
581581
}
582582

583-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
584-
Arc::new(Self { ..self.clone() })
585-
}
586-
587583
fn metrics(&self) -> &ExecutionPlanMetricsSet {
588584
&self.metrics
589585
}

datafusion/datasource/src/file.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ pub trait FileSource: Send + Sync {
6060
) -> Arc<dyn FileOpener>;
6161
/// Any
6262
fn as_any(&self) -> &dyn Any;
63-
/// Initialize new instance with projection information
64-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
6563
/// Return execution plan metrics
6664
fn metrics(&self) -> &ExecutionPlanMetricsSet;
6765
/// Return projected statistics

datafusion/datasource/src/file_scan_config.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,11 +516,12 @@ impl DataSource for FileScanConfig {
516516
.with_batch_size(Some(batch_size))
517517
.build();
518518

519-
let source = self.file_source.with_projection(&config);
519+
let opener =
520+
self.file_source
521+
.create_file_opener(object_store, &config, partition);
520522

521-
let opener = source.create_file_opener(object_store, &config, partition);
522-
523-
let stream = FileStream::new(&config, partition, opener, source.metrics())?;
523+
let stream =
524+
FileStream::new(&config, partition, opener, self.file_source.metrics())?;
524525
Ok(Box::pin(cooperative(stream)))
525526
}
526527

datafusion/datasource/src/test_util.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ impl FileSource for MockSource {
4949
self
5050
}
5151

52-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
53-
Arc::new(Self { ..self.clone() })
54-
}
55-
5652
fn metrics(&self) -> &ExecutionPlanMetricsSet {
5753
&self.metrics
5854
}

0 commit comments

Comments
 (0)