Skip to content

Commit

Permalink
Move PartitionStream to physical_plan (#6756)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jun 25, 2023
1 parent d9d9328 commit 878fec1
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
7 changes: 5 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ use arrow::{
record_batch::RecordBatch,
};

use crate::config::{ConfigEntry, ConfigOptions};
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::streaming::StreamingTable;
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
use crate::{
config::{ConfigEntry, ConfigOptions},
physical_plan::streaming::PartitionStream,
};

use super::{schema::SchemaProvider, CatalogList};

Expand Down
15 changes: 3 additions & 12 deletions datafusion/core/src/datasource/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,9 @@ use datafusion_expr::{Expr, TableType};
use log::debug;

use crate::datasource::TableProvider;
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::streaming::StreamingTableExec;
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};

/// A partition that can be converted into a [`SendableRecordBatchStream`]
pub trait PartitionStream: Send + Sync {
/// Returns the schema of this partition
fn schema(&self) -> &SchemaRef;

/// Returns a stream yielding this partitions values
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
}
use crate::execution::context::SessionState;
use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec};
use crate::physical_plan::ExecutionPlan;

/// A [`TableProvider`] that streams a set of [`PartitionStream`]
pub struct StreamingTable {
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/physical_plan/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,19 @@ use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;

use crate::datasource::streaming::PartitionStream;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
use datafusion_execution::TaskContext;

/// A partition that can be converted into a [`SendableRecordBatchStream`]
pub trait PartitionStream: Send + Sync {
/// Returns the schema of this partition
fn schema(&self) -> &SchemaRef;

/// Returns a stream yielding this partitions values
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
}

/// An [`ExecutionPlan`] for [`PartitionStream`]
pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::physical_plan::streaming::PartitionStream;
use futures::StreamExt;
use std::sync::Arc;

use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
Expand Down

0 comments on commit 878fec1

Please sign in to comment.