Skip to content

Commit

Permalink
Rename FileReader to FileOpener (#2990) (#2991)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Aug 1, 2022
1 parent 833f588 commit 193fc3b
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 24 deletions.
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl ExecutionPlan for AvroExec {
mod private {
use super::*;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{FormatReader, ReaderFuture};
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -183,13 +183,13 @@ mod private {
pub config: Arc<AvroConfig>,
}

impl FormatReader for AvroOpener {
impl FileOpener for AvroOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::physical_plan::{
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use arrow::csv;
Expand Down Expand Up @@ -197,13 +197,13 @@ struct CsvOpener {
config: Arc<CsvConfig>,
}

impl FormatReader for CsvOpener {
impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ use crate::physical_plan::metrics::BaselineMetrics;
use crate::physical_plan::RecordBatchStream;

/// A fallible future that resolves to a stream of [`RecordBatch`]
pub type ReaderFuture =
pub type FileOpenFuture =
BoxFuture<'static, Result<BoxStream<'static, ArrowResult<RecordBatch>>>>;

pub trait FormatReader: Unpin {
pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
range: Option<FileRange>,
) -> ReaderFuture;
) -> FileOpenFuture;
}

/// A stream that iterates record batch by record batch, file over file.
pub struct FileStream<F: FormatReader> {
pub struct FileStream<F: FileOpener> {
/// An iterator over input files.
file_iter: VecDeque<PartitionedFile>,
/// The stream schema (file schema including partition columns and after
Expand Down Expand Up @@ -85,12 +85,12 @@ enum FileStreamState {
/// Currently performing asynchronous IO to obtain a stream of RecordBatch
/// for a given parquet file
Open {
/// A [`ReaderFuture`] returned by [`FormatReader::open`]
future: ReaderFuture,
/// A [`FileOpenFuture`] returned by [`FormatReader::open`]
future: FileOpenFuture,
/// The partition values for this file
partition_values: Vec<ScalarValue>,
},
/// Scanning the [`BoxStream`] returned by the completion of a [`ReaderFuture`]
/// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
/// returned by [`FormatReader::open`]
Scan {
/// Partitioning column values for the current batch_iter
Expand All @@ -104,7 +104,7 @@ enum FileStreamState {
Limit,
}

impl<F: FormatReader> FileStream<F> {
impl<F: FileOpener> FileStream<F> {
pub fn new(
config: &FileScanConfig,
partition: usize,
Expand Down Expand Up @@ -212,7 +212,7 @@ impl<F: FormatReader> FileStream<F> {
}
}

impl<F: FormatReader> Stream for FileStream<F> {
impl<F: FileOpener> Stream for FileStream<F> {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
Expand All @@ -227,7 +227,7 @@ impl<F: FormatReader> Stream for FileStream<F> {
}
}

impl<F: FormatReader> RecordBatchStream for FileStream<F> {
impl<F: FileOpener> RecordBatchStream for FileStream<F> {
fn schema(&self) -> SchemaRef {
self.projected_schema.clone()
}
Expand All @@ -250,13 +250,13 @@ mod tests {
records: Vec<RecordBatch>,
}

impl FormatReader for TestOpener {
impl FileOpener for TestOpener {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
futures::future::ready(Ok(stream)).boxed()
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use crate::physical_plan::{
Expand Down Expand Up @@ -159,13 +159,13 @@ struct JsonOpener {
file_schema: SchemaRef,
}

impl FormatReader for JsonOpener {
impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let options = self.options.clone();
let schema = self.file_schema.clone();
Box::pin(async move {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use datafusion_expr::Expr;
use crate::datasource::file_format::parquet::fetch_parquet_metadata;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileStream, FormatReader, ReaderFuture,
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::metrics::BaselineMetrics;
use crate::{
Expand Down Expand Up @@ -287,13 +287,13 @@ struct ParquetOpener {
metrics: ExecutionPlanMetricsSet,
}

impl FormatReader for ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
range: Option<FileRange>,
) -> ReaderFuture {
) -> FileOpenFuture {
let metrics = ParquetFileMetrics::new(
self.partition_index,
meta.location.as_ref(),
Expand Down

0 comments on commit 193fc3b

Please sign in to comment.