Skip to content

Commit

Permalink
Support REPLACE INTO for INSERT
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeringdal committed Sep 21, 2024
1 parent aeca7ea commit 2291342
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 65 deletions.
3 changes: 2 additions & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, TableType,
};
Expand Down Expand Up @@ -272,7 +273,7 @@ pub trait TableProvider: Sync + Send {
&self,
_state: &dyn Session,
_input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions,
};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
Expand Down Expand Up @@ -85,6 +86,7 @@ impl DataFrameWriteOptions {
partition_by: vec![],
}
}

/// Set the overwrite option to true or false
pub fn with_overwrite(mut self, overwrite: bool) -> Self {
self.overwrite = overwrite;
Expand Down Expand Up @@ -1299,11 +1301,16 @@ impl DataFrame {
write_options: DataFrameWriteOptions,
) -> Result<Vec<RecordBatch>, DataFusionError> {
let arrow_schema = Schema::from(self.schema());
let insert_op = if write_options.overwrite {
InsertOp::Overwrite
} else {
InsertOp::Append
};
let plan = LogicalPlanBuilder::insert_into(
self.plan,
table_name.to_owned(),
&arrow_schema,
write_options.overwrite,
insert_op,
)?
.build()?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -181,7 +182,7 @@ impl FileFormat for ArrowFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Arrow format");
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;

Expand Down Expand Up @@ -382,7 +383,7 @@ impl FileFormat for CsvFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for CSV");
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -252,7 +253,7 @@ impl FileFormat for JsonFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Json");
}

Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -403,7 +404,7 @@ impl FileFormat for ParquetFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
if conf.overwrite {
if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Parquet");
}

Expand Down Expand Up @@ -2269,7 +2270,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down Expand Up @@ -2364,7 +2365,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down Expand Up @@ -2447,7 +2448,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
overwrite: true,
insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
Expand Down Expand Up @@ -900,7 +901,7 @@ impl TableProvider for ListingTable {
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self
Expand Down Expand Up @@ -947,7 +948,7 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
insert_op,
keep_partition_by_columns,
};

Expand Down Expand Up @@ -1969,7 +1970,8 @@ mod tests {
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
10 changes: 6 additions & 4 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::metrics::MetricsSet;

use async_trait::async_trait;
Expand Down Expand Up @@ -262,7 +263,7 @@ impl TableProvider for MemTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];
Expand All @@ -277,8 +278,8 @@ impl TableProvider for MemTable {
"Inserting query must have the same schema with the table."
);
}
if overwrite {
return not_impl_err!("Overwrite not implemented for MemoryTable yet");
if insert_op != InsertOp::Append {
return not_impl_err!("{insert_op} not implemented for MemoryTable yet");
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
Expand Down Expand Up @@ -626,7 +627,8 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?
.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactor
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
use datafusion_expr::dml::InsertOp;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
Expand Down Expand Up @@ -84,7 +85,8 @@ pub struct FileSinkConfig {
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
/// Controls whether existing data should be overwritten by this sink
pub overwrite: bool,
// TODO: doc above
pub insert_op: InsertOp,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -349,7 +350,7 @@ impl TableProvider for StreamTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
_overwrite: bool,
_insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let ordering = match self.0.order.first() {
Some(x) => {
Expand Down
24 changes: 4 additions & 20 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::dml::{CopyTo, InsertOp};
use datafusion_expr::expr::{
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
Expand Down Expand Up @@ -528,7 +528,7 @@ impl DefaultPhysicalPlanner {
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols,
overwrite: false,
insert_op: InsertOp::Append,
keep_partition_by_columns,
};

Expand All @@ -541,31 +541,15 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertInto,
op: WriteOp::Insert(insert_op),
..
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name.clone())?;
if let Some(provider) = schema.table(name).await? {
let input_exec = children.one()?;
provider
.insert_into(session_state, input_exec, false)
.await?
} else {
return exec_err!("Table '{table_name}' does not exist");
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
op: WriteOp::InsertOverwrite,
..
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name.clone())?;
if let Some(provider) = schema.table(name).await? {
let input_exec = children.one()?;
provider
.insert_into(session_state, input_exec, true)
.insert_into(session_state, input_exec, *insert_op)
.await?
} else {
return exec_err!("Table '{table_name}' does not exist");
Expand Down
12 changes: 4 additions & 8 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use datafusion_common::{
TableReference, ToDFSchema, UnnestOptions,
};

use super::dml::InsertOp;

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

Expand Down Expand Up @@ -303,20 +305,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
table_name: impl Into<TableReference>,
table_schema: &Schema,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Self> {
let table_schema = table_schema.clone().to_dfschema_ref()?;

let op = if overwrite {
WriteOp::InsertOverwrite
} else {
WriteOp::InsertInto
};

Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
table_name.into(),
table_schema,
op,
WriteOp::Insert(insert_op),
Arc::new(input),
))))
}
Expand Down
30 changes: 26 additions & 4 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ impl DmlStatement {

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum WriteOp {
InsertOverwrite,
InsertInto,
Insert(InsertOp),
Delete,
Update,
Ctas,
Expand All @@ -125,8 +124,7 @@ impl WriteOp {
/// Return a descriptive name of this [`WriteOp`]
pub fn name(&self) -> &str {
match self {
WriteOp::InsertOverwrite => "Insert Overwrite",
WriteOp::InsertInto => "Insert Into",
WriteOp::Insert(insert) => insert.name(),
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
Expand All @@ -140,6 +138,30 @@ impl Display for WriteOp {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum InsertOp {
Append,
Overwrite,
Replace,
}

impl InsertOp {
/// Return a descriptive name of this [`InsertOp`]
pub fn name(&self) -> &str {
match self {
InsertOp::Append => "Insert Into",
InsertOp::Overwrite => "Insert Overwrite",
InsertOp::Replace => "Insert Replace",
}
}
}

impl Display for InsertOp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}

fn make_count_schema() -> DFSchemaRef {
Arc::new(
Schema::new(vec![Field::new("count", DataType::UInt64, false)])
Expand Down
Loading

0 comments on commit 2291342

Please sign in to comment.