Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename FileReader to FileOpener (#2990) #2991

Merged
merged 1 commit into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl ExecutionPlan for AvroExec {
mod private {
use super::*;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -183,13 +183,13 @@ mod private {
pub config: Arc<AvroConfig>,
}

impl FormatReader for AvroOpener {
impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::physical_plan::{
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use arrow::csv;
Expand Down Expand Up @@ -197,13 +197,13 @@ struct CsvOpener {
config: Arc<CsvConfig>,
}

impl FormatReader for CsvOpener {
impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type ReaderFuture =
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;

pub trait FormatReader: Unpin {
pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
range: Option<FileRange>,
) -> ReaderFuture;
) -> FileOpenFuture;
}

/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream<F: FormatReader> {
pub struct FileStream<F: FileOpener> {
/// An iterator over input files.
file_iter: VecDeque<PartitionedFile>,
/// The stream schema (file schema including partition columns and after
Expand Down Expand Up @@ -85,12 +85,12 @@ enum FileStreamState {
/// Currently performing asynchronous IO to obtain a stream of RecordBatch
/// for a given parquet file
Open {
/// A [`ReaderFuture`] returned by [`FormatReader::open`]
future: ReaderFuture,
/// A [`FileOpenFuture`] returned by [`FormatReader::open`]
future: FileOpenFuture,
/// The partition values for this file
partition_values: Vec<ScalarValue>,
},
/// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
/// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
/// returned by [`FormatReader::open`]
Scan {
/// Partitioning column values for the current batch_iter
Expand All @@ -104,7 +104,7 @@ enum FileStreamState {
Limit,
}

impl<F: FormatReader> FileStream<F> {
impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
Expand Down Expand Up @@ -212,7 +212,7 @@ impl<F: FormatReader> FileStream<F> {
}
}

impl<F: FormatReader> Stream for FileStream<F> {
impl<F: FileOpener> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
Expand All @@ -227,7 +227,7 @@ impl<F: FormatReader> Stream for FileStream<F> {
}
}

impl<F: FormatReader> RecordBatchStream for FileStream<F> {
impl<F: FileOpener> RecordBatchStream for FileStream<F> {
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
Expand All @@ -250,13 +250,13 @@ mod tests {
records: Vec<RecordBatch>,
}

impl FormatReader for TestOpener {
impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
futures::future::ready(Ok(stream)).boxed()
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
Expand Down Expand Up @@ -159,13 +159,13 @@ struct JsonOpener {
file_schema: SchemaRef,
}

impl FormatReader for JsonOpener {
impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let options = self.options.clone();
let schema = self.file_schema.clone();
Box::pin(async move {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_expr::Expr;
use crate::datasource::file_format::parquet::fetch_parquet_metadata;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
Expand Down Expand Up @@ -287,13 +287,13 @@ struct ParquetOpener {
metrics: ExecutionPlanMetricsSet,
}

impl FormatReader for ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let metrics = ParquetFileMetrics::new(
self.partition_index,
meta.location.as_ref(),
Expand Down