diff --git a/datafusion-examples/examples/csv_json_opener.rs b/datafusion-examples/examples/csv_json_opener.rs index 1a2c2cbff418..bc87f0e73f84 100644 --- a/datafusion-examples/examples/csv_json_opener.rs +++ b/datafusion-examples/examples/csv_json_opener.rs @@ -68,10 +68,9 @@ async fn csv_opener() -> Result<()> { let config = CsvSource::new(true, b',', b'"') .with_comment(Some(b'#')) .with_schema(schema) - .with_batch_size(8192) .with_projection(&scan_config); - let opener = config.create_file_opener(object_store, &scan_config, 0); + let opener = config.create_file_opener(object_store, &scan_config, 0, 8192); let mut result = vec![]; let mut stream = diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index d0af96329b5f..4fb48c030632 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -57,6 +57,7 @@ impl FileSource for ArrowSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, + _batch_size: usize, ) -> Arc { Arc::new(ArrowOpener { object_store, @@ -68,10 +69,6 @@ impl FileSource for ArrowSource { self } - fn with_batch_size(&self, _batch_size: usize) -> Arc { - Arc::new(Self { ..self.clone() }) - } - fn with_schema(&self, _schema: SchemaRef) -> Arc { Arc::new(Self { ..self.clone() }) } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index acb2b808ef8f..85b29bceaa2f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -133,6 +133,7 @@ impl FileSource for TestSource { _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, + _batch_size: usize, ) -> Arc { Arc::new(TestOpener { batches: self.batches.clone(), @@ -146,13 +147,6 @@ impl FileSource for TestSource { todo!("should not be called") } - fn with_batch_size(&self, batch_size: usize) -> Arc { - Arc::new(TestSource { - batch_size: Some(batch_size), - ..self.clone() - }) - } - fn with_schema(&self, schema: SchemaRef) -> Arc { Arc::new(TestSource { schema: Some(schema), diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index 948049f5a747..25b14ae324b0 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -38,7 +38,6 @@ use object_store::ObjectStore; #[derive(Clone, Default)] pub struct AvroSource { schema: Option, - batch_size: Option, projection: Option>, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, @@ -51,11 +50,15 @@ impl AvroSource { Self::default() } - fn open(&self, reader: R) -> Result> { + fn open( + &self, + reader: R, + batch_size: usize, + ) -> Result> { AvroReader::try_new( reader, Arc::clone(self.schema.as_ref().expect("Schema must set before open")), - self.batch_size.expect("Batch size must set before open"), + batch_size, self.projection.clone(), ) } @@ -67,10 +70,12 @@ impl FileSource for AvroSource { object_store: Arc, _base_config: &FileScanConfig, _partition: usize, + batch_size: usize, ) -> Arc { Arc::new(private::AvroOpener { config: Arc::new(self.clone()), object_store, + batch_size, }) } @@ -78,12 +83,6 @@ impl FileSource for AvroSource { self } - fn with_batch_size(&self, batch_size: usize) -> Arc { - let mut conf = self.clone(); - conf.batch_size = Some(batch_size); - Arc::new(conf) - } - fn with_schema(&self, schema: SchemaRef) -> Arc { let mut conf = self.clone(); conf.schema = Some(schema); @@ -154,6 +153,7 @@ mod private { pub struct AvroOpener { pub config: Arc, pub object_store: Arc, + pub batch_size: usize, } impl FileOpener for AvroOpener { @@ -164,16 +164,19 @@ mod private { ) -> Result { let config = Arc::clone(&self.config); let object_store = Arc::clone(&self.object_store); + + let batch_size = self.batch_size; + Ok(Box::pin(async move { let r = object_store.get(file_meta.location()).await?; match r.payload { GetResultPayload::File(file, _) => { - let reader = config.open(file)?; + let reader = config.open(file, batch_size)?; Ok(futures::stream::iter(reader).boxed()) } GetResultPayload::Stream(_) => { let bytes = r.bytes().await?; - let reader = config.open(bytes.reader())?; + let reader = config.open(bytes.reader(), batch_size)?; Ok(futures::stream::iter(reader).boxed()) } } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 6c994af940d1..90489e6fc983 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -82,7 +82,6 @@ use tokio::io::AsyncWriteExt; /// ``` #[derive(Debug, Clone, Default)] pub struct CsvSource { - batch_size: Option, file_schema: Option, file_projection: Option>, pub(crate) has_header: bool, @@ -159,8 +158,8 @@ impl CsvSource { } impl CsvSource { - fn open(&self, reader: R) -> Result> { - Ok(self.builder().build(reader)?) + fn open(&self, reader: R, batch_size: usize) -> Result> { + Ok(self.builder().with_batch_size(batch_size).build(reader)?) } fn builder(&self) -> csv::ReaderBuilder { @@ -170,10 +169,6 @@ impl CsvSource { .expect("Schema must be set before initializing builder"), )) .with_delimiter(self.delimiter) - .with_batch_size( - self.batch_size - .expect("Batch size must be set before initializing builder"), - ) .with_header(self.has_header) .with_quote(self.quote); if let Some(terminator) = self.terminator { @@ -198,6 +193,7 @@ pub struct CsvOpener { config: Arc, file_compression_type: FileCompressionType, object_store: Arc, + batch_size: usize, } impl CsvOpener { @@ -206,11 +202,13 @@ impl CsvOpener { config: Arc, file_compression_type: FileCompressionType, object_store: Arc, + batch_size: usize, ) -> Self { Self { config, file_compression_type, object_store, + batch_size, } } } @@ -227,11 +225,13 @@ impl FileSource for CsvSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, + batch_size: usize, ) -> Arc { Arc::new(CsvOpener { config: Arc::new(self.clone()), file_compression_type: base_config.file_compression_type, object_store, + batch_size, }) } @@ -239,12 +239,6 @@ impl FileSource for CsvSource { self } - fn with_batch_size(&self, batch_size: usize) -> Arc { - let mut conf = self.clone(); - conf.batch_size = Some(batch_size); - Arc::new(conf) - } - fn with_schema(&self, schema: SchemaRef) -> Arc { let mut conf = self.clone(); conf.file_schema = Some(schema); @@ -354,6 +348,7 @@ impl FileOpener for CsvOpener { let store = Arc::clone(&self.object_store); let terminator = self.config.terminator; + let batch_size = self.batch_size; Ok(Box::pin(async move { // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries) @@ -392,7 +387,7 @@ impl FileOpener for CsvOpener { )? }; - Ok(futures::stream::iter(config.open(decoder)?).boxed()) + Ok(futures::stream::iter(config.open(decoder, batch_size)?).boxed()) } GetResultPayload::Stream(s) => { let decoder = config.builder().build_decoder(); diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index d318928e5c6b..e976315f8cdf 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -77,7 +77,6 @@ impl JsonOpener { /// JsonSource holds the extra configuration that is necessary for [`JsonOpener`] #[derive(Clone, Default)] pub struct JsonSource { - batch_size: Option, metrics: ExecutionPlanMetricsSet, projected_statistics: Option, schema_adapter_factory: Option>, @@ -102,11 +101,10 @@ impl FileSource for JsonSource { object_store: Arc, base_config: &FileScanConfig, _partition: usize, + batch_size: usize, ) -> Arc { Arc::new(JsonOpener { - batch_size: self - .batch_size - .expect("Batch size must set before creating opener"), + batch_size, projected_schema: base_config.projected_file_schema(), file_compression_type: base_config.file_compression_type, object_store, @@ -117,12 +115,6 @@ impl FileSource for JsonSource { self } - fn with_batch_size(&self, batch_size: usize) -> Arc { - let mut conf = self.clone(); - conf.batch_size = Some(batch_size); - Arc::new(conf) - } - fn with_schema(&self, _schema: SchemaRef) -> Arc { Arc::new(Self { ..self.clone() }) } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 430cb5ce54af..150d20b97ef8 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -277,8 +277,6 @@ pub struct ParquetSource { pub(crate) parquet_file_reader_factory: Option>, /// Optional user defined schema adapter pub(crate) schema_adapter_factory: Option>, - /// Batch size configuration - pub(crate) batch_size: Option, /// Optional hint for the size of the parquet metadata pub(crate) metadata_size_hint: Option, pub(crate) projected_statistics: Option, @@ -464,6 +462,7 @@ impl FileSource for ParquetSource { object_store: Arc, base_config: &FileScanConfig, partition: usize, + batch_size: usize, ) -> Arc { let projection = base_config .file_column_projection_indices() @@ -527,9 +526,7 @@ impl FileSource for ParquetSource { Arc::new(ParquetOpener { partition_index: partition, projection: Arc::from(projection), - batch_size: self - .batch_size - .expect("Batch size must set before creating ParquetOpener"), + batch_size, limit: base_config.limit, predicate: self.predicate.clone(), logical_file_schema: Arc::clone(&base_config.file_schema), @@ -553,12 +550,6 @@ impl FileSource for ParquetSource { self } - fn with_batch_size(&self, batch_size: usize) -> Arc { - let mut conf = self.clone(); - conf.batch_size = Some(batch_size); - Arc::new(conf) - } - fn with_schema(&self, schema: SchemaRef) -> Arc { Arc::new(Self { file_schema: Some(schema), diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 29fa38a8ee36..3263193e62c1 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -58,11 +58,10 @@ pub trait FileSource: Send + Sync { object_store: Arc, base_config: &FileScanConfig, partition: usize, + batch_size: usize, ) -> Arc; /// Any fn as_any(&self) -> &dyn Any; - /// Initialize new type with batch size configuration - fn with_batch_size(&self, batch_size: usize) -> Arc; /// Initialize new instance with a new schema fn with_schema(&self, schema: SchemaRef) -> Arc; /// Initialize new instance with projection information diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 95cc9e24b645..7a47e9c8cf9e 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -105,9 +105,8 @@ use log::{debug, warn}; /// # schema_adapter_factory: Option> /// # }; /// # impl FileSource for ParquetSource { -/// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize) -> Arc { unimplemented!() } +/// # fn create_file_opener(&self, _: Arc, _: &FileScanConfig, _: usize, _: usize) -> Arc { unimplemented!() } /// # fn as_any(&self) -> &dyn Any { self } -/// # fn with_batch_size(&self, _: usize) -> Arc { unimplemented!() } /// # fn with_schema(&self, _: SchemaRef) -> Arc { Arc::new(self.clone()) as Arc } /// # fn with_projection(&self, _: &FileScanConfig) -> Arc { unimplemented!() } /// # fn with_statistics(&self, statistics: Statistics) -> Arc { Arc::new(Self {projected_statistics: Some(statistics), schema_adapter_factory: self.schema_adapter_factory.clone()} ) } @@ -504,12 +503,9 @@ impl DataSource for FileScanConfig { .batch_size .unwrap_or_else(|| context.session_config().batch_size()); - let source = self - .file_source - .with_batch_size(batch_size) - .with_projection(self); + let source = self.file_source.with_projection(self); - let opener = source.create_file_opener(object_store, self, partition); + let opener = source.create_file_opener(object_store, self, partition, batch_size); let stream = FileStream::new(self, partition, opener, source.metrics())?; Ok(Box::pin(cooperative(stream))) diff --git a/datafusion/datasource/src/test_util.rs b/datafusion/datasource/src/test_util.rs index e4a5114aa073..b4b8cbc1e2ea 100644 --- a/datafusion/datasource/src/test_util.rs +++ b/datafusion/datasource/src/test_util.rs @@ -42,6 +42,7 @@ impl FileSource for MockSource { _object_store: Arc, _base_config: &FileScanConfig, _partition: usize, + _batch_size: usize, ) -> Arc { unimplemented!() } @@ -50,10 +51,6 @@ impl FileSource for MockSource { self } - fn with_batch_size(&self, _batch_size: usize) -> Arc { - Arc::new(Self { ..self.clone() }) - } - fn with_schema(&self, _schema: SchemaRef) -> Arc { Arc::new(Self { ..self.clone() }) }