@@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
5252use datafusion_common:: {
5353 plan_err, Column , DFSchema , DataFusionError , ParamValues , SchemaError , UnnestOptions ,
5454} ;
55+ use datafusion_expr:: dml:: InsertOp ;
5556use datafusion_expr:: { case, is_null, lit, SortExpr } ;
5657use datafusion_expr:: {
5758 utils:: COUNT_STAR_EXPANSION , TableProviderFilterPushDown , UNNAMED_TABLE ,
@@ -66,8 +67,9 @@ use datafusion_catalog::Session;
6667/// Contains options that control how data is
6768/// written out from a DataFrame
6869pub struct DataFrameWriteOptions {
69- /// Controls if existing data should be overwritten
70- overwrite : bool ,
70+ /// Controls how new data should be written to the table, determining whether
71+ /// to append, overwrite, or replace existing data.
72+ insert_op : InsertOp ,
7173 /// Controls if all partitions should be coalesced into a single output file
7274 /// Generally will have slower performance when set to true.
7375 single_file_output : bool ,
@@ -80,14 +82,15 @@ impl DataFrameWriteOptions {
8082 /// Create a new DataFrameWriteOptions with default values
8183 pub fn new ( ) -> Self {
8284 DataFrameWriteOptions {
83- overwrite : false ,
85+ insert_op : InsertOp :: Append ,
8486 single_file_output : false ,
8587 partition_by : vec ! [ ] ,
8688 }
8789 }
88- /// Set the overwrite option to true or false
89- pub fn with_overwrite ( mut self , overwrite : bool ) -> Self {
90- self . overwrite = overwrite;
90+
91+ /// Set the insert operation
92+ pub fn with_insert_operation ( mut self , insert_op : InsertOp ) -> Self {
93+ self . insert_op = insert_op;
9194 self
9295 }
9396
@@ -1525,7 +1528,7 @@ impl DataFrame {
15251528 self . plan ,
15261529 table_name. to_owned ( ) ,
15271530 & arrow_schema,
1528- write_options. overwrite ,
1531+ write_options. insert_op ,
15291532 ) ?
15301533 . build ( ) ?;
15311534
@@ -1566,10 +1569,11 @@ impl DataFrame {
15661569 options : DataFrameWriteOptions ,
15671570 writer_options : Option < CsvOptions > ,
15681571 ) -> Result < Vec < RecordBatch > , DataFusionError > {
1569- if options. overwrite {
1570- return Err ( DataFusionError :: NotImplemented (
1571- "Overwrites are not implemented for DataFrame::write_csv." . to_owned ( ) ,
1572- ) ) ;
1572+ if options. insert_op != InsertOp :: Append {
1573+ return Err ( DataFusionError :: NotImplemented ( format ! (
1574+ "{} is not implemented for DataFrame::write_csv." ,
1575+ options. insert_op
1576+ ) ) ) ;
15731577 }
15741578
15751579 let format = if let Some ( csv_opts) = writer_options {
@@ -1626,10 +1630,11 @@ impl DataFrame {
16261630 options : DataFrameWriteOptions ,
16271631 writer_options : Option < JsonOptions > ,
16281632 ) -> Result < Vec < RecordBatch > , DataFusionError > {
1629- if options. overwrite {
1630- return Err ( DataFusionError :: NotImplemented (
1631- "Overwrites are not implemented for DataFrame::write_json." . to_owned ( ) ,
1632- ) ) ;
1633+ if options. insert_op != InsertOp :: Append {
1634+ return Err ( DataFusionError :: NotImplemented ( format ! (
1635+ "{} is not implemented for DataFrame::write_json." ,
1636+ options. insert_op
1637+ ) ) ) ;
16331638 }
16341639
16351640 let format = if let Some ( json_opts) = writer_options {
0 commit comments