Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::common::{
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
};
use datafusion::datasource::TableProvider;
use datafusion::execution::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -555,15 +555,16 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
// for this example we ignore the partition index and metrics
// but in a real system you would likely use them to report details on
// the performance of the reader.
let filename = file_meta
.location()
let filename = partition_file
.object_meta
.location
.parts()
.last()
.expect("No path in location")
Expand All @@ -572,8 +573,8 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {

let object_store = Arc::clone(&self.object_store);
let mut inner =
ParquetObjectReader::new(object_store, file_meta.object_meta.location)
.with_file_size(file_meta.object_meta.size);
ParquetObjectReader::new(object_store, partition_file.object_meta.location)
.with_file_size(partition_file.object_meta.size);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl DataFrame {

#[cfg(test)]
mod tests {
use rstest::rstest;
use std::collections::HashMap;
use std::sync::Arc;

Expand Down Expand Up @@ -248,7 +247,7 @@ mod tests {
Ok(())
}

#[rstest]
#[rstest::rstest]
#[cfg(feature = "parquet_encryption")]
#[tokio::test]
async fn roundtrip_parquet_with_encryption(
Expand Down
25 changes: 13 additions & 12 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::any::Any;
use std::sync::Arc;

use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use crate::datasource::physical_plan::{FileOpenFuture, FileOpener};
use crate::error::Result;
use datafusion_datasource::as_file_source;
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
Expand Down Expand Up @@ -122,18 +122,16 @@ pub struct ArrowOpener {
}

impl FileOpener for ArrowOpener {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice improvement I agree

let object_store = Arc::clone(&self.object_store);
let projection = self.projection.clone();
Ok(Box::pin(async move {
let range = file_meta.range.clone();
let range = partitioned_file.range.clone();
match range {
None => {
let r = object_store.get(file_meta.location()).await?;
let r = object_store
.get(&partitioned_file.object_meta.location)
.await?;
match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
Expand Down Expand Up @@ -164,7 +162,7 @@ impl FileOpener for ArrowOpener {
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.get_opts(&partitioned_file.object_meta.location, get_option)
.await?;
let footer_len_buf = get_result.bytes().await?;
let footer_len = arrow_ipc::reader::read_footer_length(
Expand All @@ -176,7 +174,7 @@ impl FileOpener for ArrowOpener {
..Default::default()
};
let get_result = object_store
.get_opts(file_meta.location(), get_option)
.get_opts(&partitioned_file.object_meta.location, get_option)
.await?;
let footer_buf = get_result.bytes().await?;
let footer = arrow_ipc::root_as_footer(
Expand Down Expand Up @@ -204,7 +202,7 @@ impl FileOpener for ArrowOpener {
})
.collect_vec();
let dict_results = object_store
.get_ranges(file_meta.location(), &dict_ranges)
.get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
.await?;
for (dict_block, dict_result) in
footer.dictionaries().iter().flatten().zip(dict_results)
Expand Down Expand Up @@ -237,7 +235,10 @@ impl FileOpener for ArrowOpener {
.collect_vec();

let recordbatch_results = object_store
.get_ranges(file_meta.location(), &recordbatch_ranges)
.get_ranges(
&partitioned_file.object_meta.location,
&recordbatch_ranges,
)
.await?;

Ok(futures::stream::iter(
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub use csv::{CsvOpener, CsvSource};
pub use datafusion_datasource::file::FileSource;
pub use datafusion_datasource::file_groups::FileGroup;
pub use datafusion_datasource::file_groups::FileGroupPartitioner;
pub use datafusion_datasource::file_meta::FileMeta;
pub use datafusion_datasource::file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
FileScanConfigBuilder,
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ mod tests {
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{assert_contains, Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::source::DataSourceExec;

Expand Down Expand Up @@ -2207,7 +2206,7 @@ mod tests {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + Send>>
Expand All @@ -2218,7 +2217,7 @@ mod tests {
.push(metadata_size_hint);
self.inner.create_reader(
partition_index,
file_meta,
partitioned_file,
metadata_size_hint,
metrics,
)
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileMeta, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource,
ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource,
};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -119,11 +119,11 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
partitioned_file: PartitionedFile,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
let metadata = file_meta
let metadata = partitioned_file
.extensions
.as_ref()
.expect("has user defined metadata");
Expand All @@ -135,13 +135,13 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {

let parquet_file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
partitioned_file.object_meta.location.as_ref(),
metrics,
);

Ok(Box::new(ParquetFileReader {
store: Arc::clone(&self.0),
meta: file_meta.object_meta,
meta: partitioned_file.object_meta,
metrics: parquet_file_metrics,
metadata_size_hint,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use arrow::{array::RecordBatch, compute::concat_batches};
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
use datafusion_datasource::{
file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
file::FileSource, file_scan_config::FileScanConfig,
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
Expand Down Expand Up @@ -58,11 +58,7 @@ pub struct TestOpener {
}

impl FileOpener for TestOpener {
fn open(
&self,
_file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let mut batches = self.batches.clone();
if let Some(batch_size) = self.batch_size {
let batch = concat_batches(&batches[0].schema(), &batches)?;
Expand Down
14 changes: 5 additions & 9 deletions datafusion/datasource-avro/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ mod private {
use super::*;

use bytes::Buf;
use datafusion_datasource::{
file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
};
use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile};
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectStore};

Expand All @@ -157,15 +155,13 @@ mod private {
}

impl FileOpener for AvroOpener {
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let config = Arc::clone(&self.config);
let object_store = Arc::clone(&self.object_store);
Ok(Box::pin(async move {
let r = object_store.get(file_meta.location()).await?;
let r = object_store
.get(&partitioned_file.object_meta.location)
.await?;
match r.payload {
GetResultPayload::File(file, _) => {
let reader = config.open(file)?;
Expand Down
19 changes: 8 additions & 11 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::task::Poll;

use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
Expand Down Expand Up @@ -337,16 +336,12 @@ impl FileOpener for CsvOpener {
/// A,1,2,3,4,5,6,7,8,9\n
/// A},1,2,3,4,5,6,7,8,9\n
/// The lines read would be: [1, 2]
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
// `self.config.has_header` controls whether to skip reading the 1st line header
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
// partition, then don't skip first line
let mut csv_has_header = self.config.has_header;
if let Some(FileRange { start, .. }) = file_meta.range {
if let Some(FileRange { start, .. }) = partitioned_file.range {
if start != 0 {
csv_has_header = false;
}
Expand All @@ -360,7 +355,7 @@ impl FileOpener for CsvOpener {

let file_compression_type = self.file_compression_type.to_owned();

if file_meta.range.is_some() {
if partitioned_file.range.is_some() {
assert!(
!file_compression_type.is_compressed(),
"Reading compressed .csv in parallel is not supported"
Expand All @@ -374,7 +369,7 @@ impl FileOpener for CsvOpener {
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)

let calculated_range =
calculate_range(&file_meta, &store, terminator).await?;
calculate_range(&partitioned_file, &store, terminator).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand All @@ -391,12 +386,14 @@ impl FileOpener for CsvOpener {
..Default::default()
};

let result = store.get_opts(file_meta.location(), options).await?;
let result = store
.get_opts(&partitioned_file.object_meta.location, options)
.await?;

match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = file_meta.range.is_none();
let is_whole_file_scanned = partitioned_file.range.is_none();
let decoder = if is_whole_file_scanned {
// Don't seek if no range as breaks FIFO files
file_compression_type.convert_read(file)?
Expand Down
16 changes: 7 additions & 9 deletions datafusion/datasource-json/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use datafusion_common::error::{DataFusionError, Result};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_meta::FileMeta;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
Expand Down Expand Up @@ -176,18 +175,15 @@ impl FileOpener for JsonOpener {
/// are applied to determine which lines to read:
/// 1. The first line of the partition is the line in which the index of the first character >= `start`.
/// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
fn open(
&self,
file_meta: FileMeta,
_file: PartitionedFile,
) -> Result<FileOpenFuture> {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let store = Arc::clone(&self.object_store);
let schema = Arc::clone(&self.projected_schema);
let batch_size = self.batch_size;
let file_compression_type = self.file_compression_type.to_owned();

Ok(Box::pin(async move {
let calculated_range = calculate_range(&file_meta, &store, None).await?;
let calculated_range =
calculate_range(&partitioned_file, &store, None).await?;

let range = match calculated_range {
RangeCalculation::Range(None) => None,
Expand All @@ -204,12 +200,14 @@ impl FileOpener for JsonOpener {
..Default::default()
};

let result = store.get_opts(file_meta.location(), options).await?;
let result = store
.get_opts(&partitioned_file.object_meta.location, options)
.await?;

match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let bytes = match file_meta.range {
let bytes = match partitioned_file.range {
None => file_compression_type.convert_read(file)?,
Some(_) => {
file.seek(SeekFrom::Start(result.range.start as _))?;
Expand Down
Loading
Loading