Skip to content

Commit 533a749

Browse files
Pass batch size directly to file opener
1 parent d376a32 commit 533a749

File tree

10 files changed

+37
-73
lines changed

10 files changed

+37
-73
lines changed

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,9 @@ async fn csv_opener() -> Result<()> {
6868
let config = CsvSource::new(true, b',', b'"')
6969
.with_comment(Some(b'#'))
7070
.with_schema(schema)
71-
.with_batch_size(8192)
7271
.with_projection(&scan_config);
7372

74-
let opener = config.create_file_opener(object_store, &scan_config, 0);
73+
let opener = config.create_file_opener(object_store, &scan_config, 0, 8192);
7574

7675
let mut result = vec![];
7776
let mut stream =

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl FileSource for ArrowSource {
5757
object_store: Arc<dyn ObjectStore>,
5858
base_config: &FileScanConfig,
5959
_partition: usize,
60+
_batch_size: usize,
6061
) -> Arc<dyn FileOpener> {
6162
Arc::new(ArrowOpener {
6263
object_store,
@@ -68,10 +69,6 @@ impl FileSource for ArrowSource {
6869
self
6970
}
7071

71-
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
72-
Arc::new(Self { ..self.clone() })
73-
}
74-
7572
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
7673
Arc::new(Self { ..self.clone() })
7774
}

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ impl FileSource for TestSource {
133133
_object_store: Arc<dyn ObjectStore>,
134134
_base_config: &FileScanConfig,
135135
_partition: usize,
136+
_batch_size: usize,
136137
) -> Arc<dyn FileOpener> {
137138
Arc::new(TestOpener {
138139
batches: self.batches.clone(),
@@ -146,13 +147,6 @@ impl FileSource for TestSource {
146147
todo!("should not be called")
147148
}
148149

149-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
150-
Arc::new(TestSource {
151-
batch_size: Some(batch_size),
152-
..self.clone()
153-
})
154-
}
155-
156150
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
157151
Arc::new(TestSource {
158152
schema: Some(schema),

datafusion/datasource-avro/src/source.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use object_store::ObjectStore;
3838
#[derive(Clone, Default)]
3939
pub struct AvroSource {
4040
schema: Option<SchemaRef>,
41-
batch_size: Option<usize>,
4241
projection: Option<Vec<String>>,
4342
metrics: ExecutionPlanMetricsSet,
4443
projected_statistics: Option<Statistics>,
@@ -51,11 +50,15 @@ impl AvroSource {
5150
Self::default()
5251
}
5352

54-
fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
53+
fn open<R: std::io::Read>(
54+
&self,
55+
reader: R,
56+
batch_size: usize,
57+
) -> Result<AvroReader<'static, R>> {
5558
AvroReader::try_new(
5659
reader,
5760
Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
58-
self.batch_size.expect("Batch size must set before open"),
61+
batch_size,
5962
self.projection.clone(),
6063
)
6164
}
@@ -67,23 +70,19 @@ impl FileSource for AvroSource {
6770
object_store: Arc<dyn ObjectStore>,
6871
_base_config: &FileScanConfig,
6972
_partition: usize,
73+
batch_size: usize,
7074
) -> Arc<dyn FileOpener> {
7175
Arc::new(private::AvroOpener {
7276
config: Arc::new(self.clone()),
7377
object_store,
78+
batch_size,
7479
})
7580
}
7681

7782
fn as_any(&self) -> &dyn Any {
7883
self
7984
}
8085

81-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
82-
let mut conf = self.clone();
83-
conf.batch_size = Some(batch_size);
84-
Arc::new(conf)
85-
}
86-
8786
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
8887
let mut conf = self.clone();
8988
conf.schema = Some(schema);
@@ -154,6 +153,7 @@ mod private {
154153
pub struct AvroOpener {
155154
pub config: Arc<AvroSource>,
156155
pub object_store: Arc<dyn ObjectStore>,
156+
pub batch_size: usize,
157157
}
158158

159159
impl FileOpener for AvroOpener {
@@ -164,16 +164,19 @@ mod private {
164164
) -> Result<FileOpenFuture> {
165165
let config = Arc::clone(&self.config);
166166
let object_store = Arc::clone(&self.object_store);
167+
168+
let batch_size = self.batch_size;
169+
167170
Ok(Box::pin(async move {
168171
let r = object_store.get(file_meta.location()).await?;
169172
match r.payload {
170173
GetResultPayload::File(file, _) => {
171-
let reader = config.open(file)?;
174+
let reader = config.open(file, batch_size)?;
172175
Ok(futures::stream::iter(reader).boxed())
173176
}
174177
GetResultPayload::Stream(_) => {
175178
let bytes = r.bytes().await?;
176-
let reader = config.open(bytes.reader())?;
179+
let reader = config.open(bytes.reader(), batch_size)?;
177180
Ok(futures::stream::iter(reader).boxed())
178181
}
179182
}

datafusion/datasource-csv/src/source.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,6 @@ use tokio::io::AsyncWriteExt;
8282
/// ```
8383
#[derive(Debug, Clone, Default)]
8484
pub struct CsvSource {
85-
batch_size: Option<usize>,
8685
file_schema: Option<SchemaRef>,
8786
file_projection: Option<Vec<usize>>,
8887
pub(crate) has_header: bool,
@@ -159,8 +158,8 @@ impl CsvSource {
159158
}
160159

161160
impl CsvSource {
162-
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
163-
Ok(self.builder().build(reader)?)
161+
fn open<R: Read>(&self, reader: R, batch_size: usize) -> Result<csv::Reader<R>> {
162+
Ok(self.builder().with_batch_size(batch_size).build(reader)?)
164163
}
165164

166165
fn builder(&self) -> csv::ReaderBuilder {
@@ -170,10 +169,6 @@ impl CsvSource {
170169
.expect("Schema must be set before initializing builder"),
171170
))
172171
.with_delimiter(self.delimiter)
173-
.with_batch_size(
174-
self.batch_size
175-
.expect("Batch size must be set before initializing builder"),
176-
)
177172
.with_header(self.has_header)
178173
.with_quote(self.quote);
179174
if let Some(terminator) = self.terminator {
@@ -198,6 +193,7 @@ pub struct CsvOpener {
198193
config: Arc<CsvSource>,
199194
file_compression_type: FileCompressionType,
200195
object_store: Arc<dyn ObjectStore>,
196+
batch_size: usize,
201197
}
202198

203199
impl CsvOpener {
@@ -206,11 +202,13 @@ impl CsvOpener {
206202
config: Arc<CsvSource>,
207203
file_compression_type: FileCompressionType,
208204
object_store: Arc<dyn ObjectStore>,
205+
batch_size: usize,
209206
) -> Self {
210207
Self {
211208
config,
212209
file_compression_type,
213210
object_store,
211+
batch_size,
214212
}
215213
}
216214
}
@@ -227,24 +225,20 @@ impl FileSource for CsvSource {
227225
object_store: Arc<dyn ObjectStore>,
228226
base_config: &FileScanConfig,
229227
_partition: usize,
228+
batch_size: usize,
230229
) -> Arc<dyn FileOpener> {
231230
Arc::new(CsvOpener {
232231
config: Arc::new(self.clone()),
233232
file_compression_type: base_config.file_compression_type,
234233
object_store,
234+
batch_size,
235235
})
236236
}
237237

238238
fn as_any(&self) -> &dyn Any {
239239
self
240240
}
241241

242-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
243-
let mut conf = self.clone();
244-
conf.batch_size = Some(batch_size);
245-
Arc::new(conf)
246-
}
247-
248242
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
249243
let mut conf = self.clone();
250244
conf.file_schema = Some(schema);
@@ -354,6 +348,7 @@ impl FileOpener for CsvOpener {
354348

355349
let store = Arc::clone(&self.object_store);
356350
let terminator = self.config.terminator;
351+
let batch_size = self.batch_size;
357352

358353
Ok(Box::pin(async move {
359354
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
@@ -392,7 +387,7 @@ impl FileOpener for CsvOpener {
392387
)?
393388
};
394389

395-
Ok(futures::stream::iter(config.open(decoder)?).boxed())
390+
Ok(futures::stream::iter(config.open(decoder, batch_size)?).boxed())
396391
}
397392
GetResultPayload::Stream(s) => {
398393
let decoder = config.builder().build_decoder();

datafusion/datasource-json/src/source.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ impl JsonOpener {
7777
/// JsonSource holds the extra configuration that is necessary for [`JsonOpener`]
7878
#[derive(Clone, Default)]
7979
pub struct JsonSource {
80-
batch_size: Option<usize>,
8180
metrics: ExecutionPlanMetricsSet,
8281
projected_statistics: Option<Statistics>,
8382
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
@@ -102,11 +101,10 @@ impl FileSource for JsonSource {
102101
object_store: Arc<dyn ObjectStore>,
103102
base_config: &FileScanConfig,
104103
_partition: usize,
104+
batch_size: usize,
105105
) -> Arc<dyn FileOpener> {
106106
Arc::new(JsonOpener {
107-
batch_size: self
108-
.batch_size
109-
.expect("Batch size must set before creating opener"),
107+
batch_size,
110108
projected_schema: base_config.projected_file_schema(),
111109
file_compression_type: base_config.file_compression_type,
112110
object_store,
@@ -117,12 +115,6 @@ impl FileSource for JsonSource {
117115
self
118116
}
119117

120-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
121-
let mut conf = self.clone();
122-
conf.batch_size = Some(batch_size);
123-
Arc::new(conf)
124-
}
125-
126118
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
127119
Arc::new(Self { ..self.clone() })
128120
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,6 @@ pub struct ParquetSource {
277277
pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
278278
/// Optional user defined schema adapter
279279
pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
280-
/// Batch size configuration
281-
pub(crate) batch_size: Option<usize>,
282280
/// Optional hint for the size of the parquet metadata
283281
pub(crate) metadata_size_hint: Option<usize>,
284282
pub(crate) projected_statistics: Option<Statistics>,
@@ -464,6 +462,7 @@ impl FileSource for ParquetSource {
464462
object_store: Arc<dyn ObjectStore>,
465463
base_config: &FileScanConfig,
466464
partition: usize,
465+
batch_size: usize,
467466
) -> Arc<dyn FileOpener> {
468467
let projection = base_config
469468
.file_column_projection_indices()
@@ -527,9 +526,7 @@ impl FileSource for ParquetSource {
527526
Arc::new(ParquetOpener {
528527
partition_index: partition,
529528
projection: Arc::from(projection),
530-
batch_size: self
531-
.batch_size
532-
.expect("Batch size must set before creating ParquetOpener"),
529+
batch_size,
533530
limit: base_config.limit,
534531
predicate: self.predicate.clone(),
535532
logical_file_schema: Arc::clone(&base_config.file_schema),
@@ -553,12 +550,6 @@ impl FileSource for ParquetSource {
553550
self
554551
}
555552

556-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
557-
let mut conf = self.clone();
558-
conf.batch_size = Some(batch_size);
559-
Arc::new(conf)
560-
}
561-
562553
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
563554
Arc::new(Self {
564555
file_schema: Some(schema),

datafusion/datasource/src/file.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,10 @@ pub trait FileSource: Send + Sync {
5858
object_store: Arc<dyn ObjectStore>,
5959
base_config: &FileScanConfig,
6060
partition: usize,
61+
batch_size: usize,
6162
) -> Arc<dyn FileOpener>;
6263
/// Any
6364
fn as_any(&self) -> &dyn Any;
64-
/// Initialize new type with batch size configuration
65-
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
6665
/// Initialize new instance with a new schema
6766
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
6867
/// Initialize new instance with projection information

datafusion/datasource/src/file_scan_config.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,8 @@ use log::{debug, warn};
105105
/// # schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>
106106
/// # };
107107
/// # impl FileSource for ParquetSource {
108-
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
108+
/// # fn create_file_opener(&self, _: Arc<dyn ObjectStore>, _: &FileScanConfig, _: usize, _: usize) -> Arc<dyn FileOpener> { unimplemented!() }
109109
/// # fn as_any(&self) -> &dyn Any { self }
110-
/// # fn with_batch_size(&self, _: usize) -> Arc<dyn FileSource> { unimplemented!() }
111110
/// # fn with_schema(&self, _: SchemaRef) -> Arc<dyn FileSource> { Arc::new(self.clone()) as Arc<dyn FileSource> }
112111
/// # fn with_projection(&self, _: &FileScanConfig) -> Arc<dyn FileSource> { unimplemented!() }
113112
/// # fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) }
@@ -504,14 +503,12 @@ impl DataSource for FileScanConfig {
504503
.batch_size
505504
.unwrap_or_else(|| context.session_config().batch_size());
506505

507-
let source = self
508-
.file_source
509-
.with_batch_size(batch_size)
510-
.with_projection(self);
506+
let source = self.file_source.with_projection(self);
511507

512-
let opener = source.create_file_opener(object_store, self, partition);
508+
let opener = source.create_file_opener(object_store, self, partition, batch_size);
513509

514-
let stream = FileStream::new(self, partition, opener, source.metrics())?;
510+
let stream =
511+
FileStream::new(self, partition, opener, self.file_source.metrics())?;
515512
Ok(Box::pin(cooperative(stream)))
516513
}
517514

datafusion/datasource/src/test_util.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl FileSource for MockSource {
4242
_object_store: Arc<dyn ObjectStore>,
4343
_base_config: &FileScanConfig,
4444
_partition: usize,
45+
_batch_size: usize,
4546
) -> Arc<dyn FileOpener> {
4647
unimplemented!()
4748
}
@@ -50,10 +51,6 @@ impl FileSource for MockSource {
5051
self
5152
}
5253

53-
fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
54-
Arc::new(Self { ..self.clone() })
55-
}
56-
5754
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
5855
Arc::new(Self { ..self.clone() })
5956
}

0 commit comments

Comments
 (0)