Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ObjectStore from FileStream (#4533) #4601

Merged
merged 2 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,20 @@ impl ExecutionPlan for AvroExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
use super::file_stream::FileStream;
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(private::AvroConfig {
schema: Arc::clone(&self.base_config.file_schema),
batch_size: context.session_config().batch_size(),
projection: self.base_config.projected_file_column_names(),
object_store,
});
let opener = private::AvroOpener { config };

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -157,6 +157,7 @@ mod private {
pub schema: SchemaRef,
pub batch_size: usize,
pub projection: Option<Vec<String>>,
pub object_store: Arc<dyn ObjectStore>,
}

impl AvroConfig {
Expand All @@ -178,14 +179,10 @@ mod private {
}

impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
Expand Down
23 changes: 10 additions & 13 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,25 @@ impl ExecutionPlan for CsvExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;

let config = Arc::new(CsvConfig {
batch_size: context.session_config().batch_size(),
file_schema: Arc::clone(&self.base_config.file_schema),
file_projection: self.base_config.file_column_projection_indices(),
has_header: self.has_header,
delimiter: self.delimiter,
object_store,
});

let opener = CsvOpener {
config,
file_compression_type: self.file_compression_type.to_owned(),
};
let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;
Ok(Box::pin(stream) as SendableRecordBatchStream)
}

Expand Down Expand Up @@ -184,6 +184,7 @@ struct CsvConfig {
file_projection: Option<Vec<usize>>,
has_header: bool,
delimiter: u8,
object_store: Arc<dyn ObjectStore>,
}

impl CsvConfig {
Expand All @@ -208,15 +209,11 @@ struct CsvOpener {
}

impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let config = self.config.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
match config.object_store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
Ok(futures::stream::iter(config.open(decoder, true)).boxed())
Expand Down
38 changes: 6 additions & 32 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,18 @@

use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use arrow::datatypes::SchemaRef;
use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use datafusion_common::ScalarValue;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::ObjectStore;

use datafusion_common::ScalarValue;

use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
Expand All @@ -56,11 +52,7 @@ pub type FileOpenFuture =
pub trait FileOpener: Unpin {
/// Asynchronously open the specified file and return a stream
/// of [`RecordBatch`]
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture>;
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
}

/// A stream that iterates record batch by record batch, file over file.
Expand All @@ -79,8 +71,6 @@ pub struct FileStream<F: FileOpener> {
file_reader: F,
/// The partition column projector
pc_projector: PartitionColumnProjector,
/// the store from which to source the files.
object_store: Arc<dyn ObjectStore>,
/// The stream state
state: FileStreamState,
/// File stream specific metrics
Expand Down Expand Up @@ -175,7 +165,6 @@ impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
context: Arc<TaskContext>,
file_reader: F,
metrics: ExecutionPlanMetricsSet,
) -> Result<Self> {
Expand All @@ -191,17 +180,12 @@ impl<F: FileOpener> FileStream<F> {

let files = config.file_groups[partition].clone();

let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;

Ok(Self {
file_iter: files.into(),
projected_schema,
remain: config.limit,
file_reader,
pc_projector,
object_store,
state: FileStreamState::Idle,
file_stream_metrics: FileStreamMetrics::new(&metrics, partition),
baseline_metrics: BaselineMetrics::new(&metrics, partition),
Expand All @@ -228,7 +212,7 @@ impl<F: FileOpener> FileStream<F> {

self.file_stream_metrics.time_opening.start();

match self.file_reader.open(self.object_store.clone(), file_meta) {
match self.file_reader.open(file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
Expand Down Expand Up @@ -339,11 +323,7 @@ mod tests {
}

impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
Ok(futures::future::ready(Ok(stream)).boxed())
Expand Down Expand Up @@ -375,14 +355,8 @@ mod tests {
output_ordering: None,
};

let file_stream = FileStream::new(
&config,
0,
ctx.task_ctx(),
reader,
ExecutionPlanMetricsSet::new(),
)
.unwrap();
let file_stream =
FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap();

file_stream
.map(|b| b.expect("No error expected in stream"))
Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,18 @@ impl ExecutionPlan for NdJsonExec {
options
};

let object_store = context
.runtime_env()
.object_store(&self.base_config.object_store_url)?;
let opener = JsonOpener {
file_schema,
options,
file_compression_type: self.file_compression_type.to_owned(),
object_store,
};

let stream = FileStream::new(
&self.base_config,
partition,
context,
opener,
self.metrics.clone(),
)?;
let stream =
FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?;

Ok(Box::pin(stream) as SendableRecordBatchStream)
}
Expand Down Expand Up @@ -162,16 +161,14 @@ struct JsonOpener {
options: DecoderOptions,
file_schema: SchemaRef,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
}

impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
let store = self.object_store.clone();
let file_compression_type = self.file_compression_type.to_owned();
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
Expand Down
7 changes: 1 addition & 6 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ impl ExecutionPlan for ParquetExec {
let stream = FileStream::new(
&self.base_config,
partition_index,
ctx,
opener,
self.metrics.clone(),
)?;
Expand Down Expand Up @@ -406,11 +405,7 @@ struct ParquetOpener {
}

impl FileOpener for ParquetOpener {
fn open(
&self,
_: Arc<dyn ObjectStore>,
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let file_metrics = ParquetFileMetrics::new(
Expand Down