Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use crate::arrow::util::pretty;
use crate::datasource::file_format::csv::CsvFormatFactory;
use crate::datasource::file_format::format_as_file_type;
use crate::datasource::file_format::json::JsonFormatFactory;
use crate::datasource::{provider_as_source, MemTable, TableProvider};
use crate::datasource::{
provider_as_source, DefaultTableSource, MemTable, TableProvider,
};
use crate::error::Result;
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::FunctionRegistry;
Expand Down Expand Up @@ -62,6 +64,7 @@ use datafusion_functions_aggregate::expr_fn::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_sql::TableReference;

/// Contains options that control how data is
/// written out from a DataFrame
Expand Down Expand Up @@ -1526,8 +1529,6 @@ impl DataFrame {
table_name: &str,
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());

let plan = if write_options.sort_by.is_empty() {
self.plan
} else {
Expand All @@ -1536,10 +1537,19 @@ impl DataFrame {
.build()?
};

let table_ref: TableReference = table_name.into();
let table_schema = self.session_state.schema_for_ref(table_ref.clone())?;
let target = match table_schema.table(table_ref.table()).await? {
Some(ref provider) => Ok(Arc::clone(provider)),
_ => plan_err!("No table named '{table_name}'"),
}?;

let target = Arc::new(DefaultTableSource::new(target));

let plan = LogicalPlanBuilder::insert_into(
plan,
table_name.to_owned(),
&arrow_schema,
target,
write_options.insert_op,
)?
.build()?;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ mod tests {
use crate::datasource::file_format::json::JsonFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{provider_as_source, MemTable};
use crate::datasource::{provider_as_source, DefaultTableSource, MemTable};
use crate::execution::options::ArrowReadOptions;
use crate::prelude::*;
use crate::{
Expand Down Expand Up @@ -2065,6 +2065,8 @@ mod tests {
session_ctx.register_table("source", source_table.clone())?;
// Convert the source table into a provider so that it can be used in a query
let source = provider_as_source(source_table);
let target = session_ctx.table_provider("t").await?;
let target = Arc::new(DefaultTableSource::new(target));
// Create a table scan logical plan to read from the source table
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
.filter(filter_predicate)?
Expand All @@ -2073,7 +2075,7 @@ mod tests {
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl DataSink for MemSink {
mod tests {

use super::*;
use crate::datasource::provider_as_source;
use crate::datasource::{provider_as_source, DefaultTableSource};
use crate::physical_plan::collect;
use crate::prelude::SessionContext;

Expand Down Expand Up @@ -640,6 +640,7 @@ mod tests {
// Create and register the initial table with the provided schema and data
let initial_table = Arc::new(MemTable::try_new(schema.clone(), initial_data)?);
session_ctx.register_table("t", initial_table.clone())?;
let target = Arc::new(DefaultTableSource::new(initial_table.clone()));
// Create and register the source table with the provided schema and inserted data
let source_table = Arc::new(MemTable::try_new(schema.clone(), inserted_data)?);
session_ctx.register_table("source", source_table.clone())?;
Expand All @@ -649,7 +650,7 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
Expand Down
15 changes: 9 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use crate::datasource::file_format::file_type_to_format;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::source_as_provider;
use crate::datasource::{source_as_provider, DefaultTableSource};
use crate::error::{DataFusionError, Result};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
Expand Down Expand Up @@ -541,19 +541,22 @@ impl DefaultPhysicalPlanner {
.await?
}
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Insert(insert_op),
..
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name.clone())?;
if let Some(provider) = schema.table(name).await? {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let input_exec = children.one()?;
provider
.table_provider
.insert_into(session_state, input_exec, *insert_op)
.await?
} else {
return exec_err!("Table '{table_name}' does not exist");
return exec_err!(
"Table source can't be downcasted to DefaultTableSource"
);
}
}
LogicalPlan::Window(Window { window_expr, .. }) => {
Expand Down
6 changes: 2 additions & 4 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,14 +384,12 @@ impl LogicalPlanBuilder {
pub fn insert_into(
input: LogicalPlan,
table_name: impl Into<TableReference>,
table_schema: &Schema,
target: Arc<dyn TableSource>,
insert_op: InsertOp,
) -> Result<Self> {
let table_schema = table_schema.clone().to_dfschema_ref()?;

Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
table_name.into(),
table_schema,
target,
WriteOp::Insert(insert_op),
Arc::new(input),
))))
Expand Down
45 changes: 39 additions & 6 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{DFSchemaRef, TableReference};

use crate::LogicalPlan;
use crate::{LogicalPlan, TableSource};

/// Operator that copies the contents of a database to file(s)
#[derive(Clone)]
Expand Down Expand Up @@ -91,31 +91,64 @@ impl Hash for CopyTo {

/// The operator that modifies the content of a database (adapted from
/// substrait WriteRel)
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Clone)]
pub struct DmlStatement {
/// The table name
pub table_name: TableReference,
/// The schema of the table (must align with Rel input)
pub table_schema: DFSchemaRef,
/// this is target table to insert into
pub target: Arc<dyn TableSource>,
/// The type of operation to perform
pub op: WriteOp,
/// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
pub input: Arc<LogicalPlan>,
/// The schema of the output relation
pub output_schema: DFSchemaRef,
}
impl Eq for DmlStatement {}
impl Hash for DmlStatement {
fn hash<H: Hasher>(&self, state: &mut H) {
self.table_name.hash(state);
self.target.schema().hash(state);
self.op.hash(state);
self.input.hash(state);
self.output_schema.hash(state);
}
}

impl PartialEq for DmlStatement {
fn eq(&self, other: &Self) -> bool {
self.table_name == other.table_name
&& self.target.schema() == other.target.schema()
&& self.op == other.op
&& self.input == other.input
&& self.output_schema == other.output_schema
}
}

impl Debug for DmlStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("DmlStatement")
.field("table_name", &self.table_name)
.field("target", &"...")
.field("target_schema", &self.target.schema())
.field("op", &self.op)
.field("input", &self.input)
.field("output_schema", &self.output_schema)
.finish()
}
}

impl DmlStatement {
/// Creates a new DML statement with the output schema set to a single `count` column.
pub fn new(
table_name: TableReference,
table_schema: DFSchemaRef,
target: Arc<dyn TableSource>,
op: WriteOp,
input: Arc<LogicalPlan>,
) -> Self {
Self {
table_name,
table_schema,
target,
op,
input,

Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,15 +784,15 @@ impl LogicalPlan {
}
LogicalPlan::Dml(DmlStatement {
table_name,
table_schema,
target,
op,
..
}) => {
self.assert_no_expressions(expr)?;
let input = self.only_input(inputs)?;
Ok(LogicalPlan::Dml(DmlStatement::new(
table_name.clone(),
Arc::clone(table_schema),
Arc::clone(target),
op.clone(),
Arc::new(input),
)))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ impl TreeNode for LogicalPlan {
}),
LogicalPlan::Dml(DmlStatement {
table_name,
table_schema,
target,
op,
input,
output_schema,
}) => input.map_elements(f)?.update_data(|input| {
LogicalPlan::Dml(DmlStatement {
table_name,
table_schema,
target,
op,
input,
output_schema,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ message DmlNode{
Type dml_type = 1;
LogicalPlanNode input = 2;
TableReference table_name = 3;
datafusion_common.DfSchema schema = 4;
LogicalPlanNode target = 5;
}

message UnnestNode {
Expand Down
24 changes: 12 additions & 12 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading