Skip to content

Commit 24ea516

Browse files
authored
Merge pull request #7719 from lichuang/idempotent-copy
feat: idempotent-copy file
2 parents d4b68a5 + 5525e87 commit 24ea516

File tree

18 files changed

+430
-20
lines changed

18 files changed

+430
-20
lines changed

src/query/ast/src/ast/statements/copy.rs

+2
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub struct CopyStmt<'a> {
3939
pub validation_mode: String,
4040
pub size_limit: usize,
4141
pub purge: bool,
42+
pub force: bool,
4243
}
4344

4445
impl Display for CopyStmt<'_> {
@@ -74,6 +75,7 @@ impl Display for CopyStmt<'_> {
7475
}
7576

7677
write!(f, " PURGE = {}", self.purge)?;
78+
write!(f, " FORCE = {}", self.force)?;
7779
Ok(())
7880
}
7981
}

src/query/ast/src/parser/statement.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -742,8 +742,22 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
742742
~ ( VALIDATION_MODE ~ "=" ~ #literal_string)?
743743
~ ( SIZE_LIMIT ~ "=" ~ #literal_u64)?
744744
~ ( PURGE ~ "=" ~ #literal_bool)?
745+
~ ( FORCE ~ "=" ~ #literal_bool)?
745746
},
746-
|(_, _, dst, _, src, files, pattern, file_format, validation_mode, size_limit, purge)| {
747+
|(
748+
_,
749+
_,
750+
dst,
751+
_,
752+
src,
753+
files,
754+
pattern,
755+
file_format,
756+
validation_mode,
757+
size_limit,
758+
purge,
759+
force,
760+
)| {
747761
Statement::Copy(CopyStmt {
748762
src,
749763
dst,
@@ -753,6 +767,7 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
753767
validation_mode: validation_mode.map(|v| v.2).unwrap_or_default(),
754768
size_limit: size_limit.map(|v| v.2).unwrap_or_default() as usize,
755769
purge: purge.map(|v| v.2).unwrap_or_default(),
770+
force: force.map(|v| v.2).unwrap_or_default(),
756771
})
757772
},
758773
);

src/query/ast/src/parser/token.rs

+2
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,8 @@ pub enum TokenKind {
403403
FLOAT64,
404404
#[token("FOR", ignore(ascii_case))]
405405
FOR,
406+
#[token("FORCE", ignore(ascii_case))]
407+
FORCE,
406408
#[token("FORMAT", ignore(ascii_case))]
407409
FORMAT,
408410
#[token("FRAGMENTS", ignore(ascii_case))]

src/query/ast/tests/it/parser.rs

+9
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,15 @@ fn test_statement() {
242242
skip_header = 1
243243
)
244244
size_limit=10;"#,
245+
r#"COPY INTO mytable
246+
FROM @external_stage/path/to/file.csv
247+
FILE_FORMAT = (
248+
type = 'CSV'
249+
field_delimiter = ','
250+
record_delimiter = '\n'
251+
skip_header = 1
252+
)
253+
force=true;"#,
245254
// We used to support COPY FROM a quoted at string
246255
// r#"COPY INTO mytable
247256
// FROM '@external_stage/path/to/file.csv'

src/query/ast/tests/it/testdata/statement-error.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ error:
279279
--> SQL:1:38
280280
|
281281
1 | COPY INTO mytable FROM 's3://bucket' CREDENTIAL = ();
282-
| ^^^^^^^^^^ expected `CONNECTION`, `CREDENTIALS`, `ENCRYPTION`, `FILES`, `PATTERN`, `FILE_FORMAT`, or 5 more ...
282+
| ^^^^^^^^^^ expected `CONNECTION`, `CREDENTIALS`, `ENCRYPTION`, `FILES`, `PATTERN`, `FILE_FORMAT`, or 6 more ...
283283

284284

285285
---------- Input ----------
@@ -289,7 +289,7 @@ error:
289289
--> SQL:1:33
290290
|
291291
1 | COPY INTO mytable FROM @mystage CREDENTIALS = ();
292-
| ^^^^^^^^^^^ expected `FILES`, `PATTERN`, `FILE_FORMAT`, `VALIDATION_MODE`, `SIZE_LIMIT`, `PURGE`, or 2 more ...
292+
| ^^^^^^^^^^^ expected `FILES`, `PATTERN`, `FILE_FORMAT`, `VALIDATION_MODE`, `SIZE_LIMIT`, `PURGE`, or 3 more ...
293293

294294

295295
---------- Input ----------

src/query/ast/tests/it/testdata/statement.txt

+59-7
Original file line numberDiff line numberDiff line change
@@ -5509,7 +5509,7 @@ COPY INTO mytable
55095509
size_limit=10;
55105510
---------- Output ---------
55115511
COPY INTO mytable FROM 's3://mybucket/data.csv' FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5512-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5512+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
55135513
---------- AST ------------
55145514
Copy(
55155515
CopyStmt {
@@ -5541,6 +5541,7 @@ Copy(
55415541
validation_mode: "",
55425542
size_limit: 10,
55435543
purge: false,
5544+
force: false,
55445545
},
55455546
)
55465547

@@ -5560,7 +5561,7 @@ COPY INTO mytable
55605561
size_limit=10;
55615562
---------- Output ---------
55625563
COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( endpoint_url='http://127.0.0.1:9900' ) FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5563-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5564+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
55645565
---------- AST ------------
55655566
Copy(
55665567
CopyStmt {
@@ -5594,6 +5595,7 @@ Copy(
55945595
validation_mode: "",
55955596
size_limit: 10,
55965597
purge: false,
5598+
force: false,
55975599
},
55985600
)
55995601

@@ -5610,7 +5612,7 @@ COPY INTO mytable
56105612
size_limit=10;
56115613
---------- Output ---------
56125614
COPY INTO mytable FROM @my_stage/ FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5613-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5615+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
56145616
---------- AST ------------
56155617
Copy(
56165618
CopyStmt {
@@ -5638,6 +5640,7 @@ Copy(
56385640
validation_mode: "",
56395641
size_limit: 10,
56405642
purge: false,
5643+
force: false,
56415644
},
56425645
)
56435646

@@ -5654,7 +5657,7 @@ COPY INTO 's3://mybucket/data.csv'
56545657
size_limit=10;
56555658
---------- Output ---------
56565659
COPY INTO 's3://mybucket/data.csv' FROM mytable FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5657-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5660+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
56585661
---------- AST ------------
56595662
Copy(
56605663
CopyStmt {
@@ -5686,6 +5689,7 @@ Copy(
56865689
validation_mode: "",
56875690
size_limit: 10,
56885691
purge: false,
5692+
force: false,
56895693
},
56905694
)
56915695

@@ -5702,7 +5706,7 @@ COPY INTO @my_stage
57025706
size_limit=10;
57035707
---------- Output ---------
57045708
COPY INTO @my_stage/ FROM mytable FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5705-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5709+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
57065710
---------- AST ------------
57075711
Copy(
57085712
CopyStmt {
@@ -5730,6 +5734,7 @@ Copy(
57305734
validation_mode: "",
57315735
size_limit: 10,
57325736
purge: false,
5737+
force: false,
57335738
},
57345739
)
57355740

@@ -5753,7 +5758,7 @@ COPY INTO mytable
57535758
size_limit=10;
57545759
---------- Output ---------
57555760
COPY INTO mytable FROM 's3://mybucket/data.csv' CONNECTION = ( aws_key_id='access_key' aws_secret_key='secret_key' master_key='master_key' ) FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5756-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5761+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
57575762
---------- AST ------------
57585763
Copy(
57595764
CopyStmt {
@@ -5789,6 +5794,7 @@ Copy(
57895794
validation_mode: "",
57905795
size_limit: 10,
57915796
purge: false,
5797+
force: false,
57925798
},
57935799
)
57945800

@@ -5805,7 +5811,7 @@ COPY INTO mytable
58055811
size_limit=10;
58065812
---------- Output ---------
58075813
COPY INTO mytable FROM @external_stage/path/to/file.csv FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5808-
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false
5814+
' skip_header = '1' type = 'CSV' ) SIZE_LIMIT = 10 PURGE = false FORCE = false
58095815
---------- AST ------------
58105816
Copy(
58115817
CopyStmt {
@@ -5833,6 +5839,52 @@ Copy(
58335839
validation_mode: "",
58345840
size_limit: 10,
58355841
purge: false,
5842+
force: false,
5843+
},
5844+
)
5845+
5846+
5847+
---------- Input ----------
5848+
COPY INTO mytable
5849+
FROM @external_stage/path/to/file.csv
5850+
FILE_FORMAT = (
5851+
type = 'CSV'
5852+
field_delimiter = ','
5853+
record_delimiter = '\n'
5854+
skip_header = 1
5855+
)
5856+
force=true;
5857+
---------- Output ---------
5858+
COPY INTO mytable FROM @external_stage/path/to/file.csv FILE_FORMAT = ( field_delimiter = ',' record_delimiter = '
5859+
' skip_header = '1' type = 'CSV' ) PURGE = false FORCE = true
5860+
---------- AST ------------
5861+
Copy(
5862+
CopyStmt {
5863+
src: StageLocation {
5864+
name: "external_stage",
5865+
path: "/path/to/file.csv",
5866+
},
5867+
dst: Table {
5868+
catalog: None,
5869+
database: None,
5870+
table: Identifier {
5871+
name: "mytable",
5872+
quote: None,
5873+
span: Ident(10..17),
5874+
},
5875+
},
5876+
files: [],
5877+
pattern: "",
5878+
file_format: {
5879+
"field_delimiter": ",",
5880+
"record_delimiter": "\n",
5881+
"skip_header": "1",
5882+
"type": "CSV",
5883+
},
5884+
validation_mode: "",
5885+
size_limit: 0,
5886+
purge: false,
5887+
force: true,
58365888
},
58375889
)
58385890

src/query/catalog/src/catalog.rs

+18
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,25 @@ use common_meta_app::schema::CreateTableReq;
2626
use common_meta_app::schema::DropDatabaseReq;
2727
use common_meta_app::schema::DropTableReply;
2828
use common_meta_app::schema::DropTableReq;
29+
use common_meta_app::schema::GetTableCopiedFileReply;
30+
use common_meta_app::schema::GetTableCopiedFileReq;
2931
use common_meta_app::schema::RenameDatabaseReply;
3032
use common_meta_app::schema::RenameDatabaseReq;
3133
use common_meta_app::schema::RenameTableReply;
3234
use common_meta_app::schema::RenameTableReq;
3335
use common_meta_app::schema::TableIdent;
3436
use common_meta_app::schema::TableInfo;
3537
use common_meta_app::schema::TableMeta;
38+
use common_meta_app::schema::TruncateTableReply;
39+
use common_meta_app::schema::TruncateTableReq;
3640
use common_meta_app::schema::UndropDatabaseReply;
3741
use common_meta_app::schema::UndropDatabaseReq;
3842
use common_meta_app::schema::UndropTableReply;
3943
use common_meta_app::schema::UndropTableReq;
4044
use common_meta_app::schema::UpdateTableMetaReply;
4145
use common_meta_app::schema::UpdateTableMetaReq;
46+
use common_meta_app::schema::UpsertTableCopiedFileReply;
47+
use common_meta_app::schema::UpsertTableCopiedFileReq;
4248
use common_meta_app::schema::UpsertTableOptionReply;
4349
use common_meta_app::schema::UpsertTableOptionReq;
4450
use common_meta_types::MetaId;
@@ -152,6 +158,18 @@ pub trait Catalog: DynClone + Send + Sync {
152158

153159
async fn count_tables(&self, req: CountTablesReq) -> Result<CountTablesReply>;
154160

161+
async fn get_table_copied_file_info(
162+
&self,
163+
req: GetTableCopiedFileReq,
164+
) -> Result<GetTableCopiedFileReply>;
165+
166+
async fn upsert_table_copied_file_info(
167+
&self,
168+
req: UpsertTableCopiedFileReq,
169+
) -> Result<UpsertTableCopiedFileReply>;
170+
171+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply>;
172+
155173
/// Table function
156174
157175
// Get function by name.

src/query/service/src/catalogs/default/database_catalog.rs

+26
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,25 @@ use common_meta_app::schema::CreateTableReq;
2525
use common_meta_app::schema::DropDatabaseReq;
2626
use common_meta_app::schema::DropTableReply;
2727
use common_meta_app::schema::DropTableReq;
28+
use common_meta_app::schema::GetTableCopiedFileReply;
29+
use common_meta_app::schema::GetTableCopiedFileReq;
2830
use common_meta_app::schema::RenameDatabaseReply;
2931
use common_meta_app::schema::RenameDatabaseReq;
3032
use common_meta_app::schema::RenameTableReply;
3133
use common_meta_app::schema::RenameTableReq;
3234
use common_meta_app::schema::TableIdent;
3335
use common_meta_app::schema::TableInfo;
3436
use common_meta_app::schema::TableMeta;
37+
use common_meta_app::schema::TruncateTableReply;
38+
use common_meta_app::schema::TruncateTableReq;
3539
use common_meta_app::schema::UndropDatabaseReply;
3640
use common_meta_app::schema::UndropDatabaseReq;
3741
use common_meta_app::schema::UndropTableReply;
3842
use common_meta_app::schema::UndropTableReq;
3943
use common_meta_app::schema::UpdateTableMetaReply;
4044
use common_meta_app::schema::UpdateTableMetaReq;
45+
use common_meta_app::schema::UpsertTableCopiedFileReply;
46+
use common_meta_app::schema::UpsertTableCopiedFileReq;
4147
use common_meta_app::schema::UpsertTableOptionReply;
4248
use common_meta_app::schema::UpsertTableOptionReq;
4349
use common_meta_types::MetaId;
@@ -437,6 +443,26 @@ impl Catalog for DatabaseCatalog {
437443
Ok(res)
438444
}
439445

446+
async fn get_table_copied_file_info(
447+
&self,
448+
req: GetTableCopiedFileReq,
449+
) -> Result<GetTableCopiedFileReply> {
450+
self.mutable_catalog.get_table_copied_file_info(req).await
451+
}
452+
453+
async fn upsert_table_copied_file_info(
454+
&self,
455+
req: UpsertTableCopiedFileReq,
456+
) -> Result<UpsertTableCopiedFileReply> {
457+
self.mutable_catalog
458+
.upsert_table_copied_file_info(req)
459+
.await
460+
}
461+
462+
async fn truncate_table(&self, req: TruncateTableReq) -> Result<TruncateTableReply> {
463+
self.mutable_catalog.truncate_table(req).await
464+
}
465+
440466
async fn upsert_table_option(
441467
&self,
442468
req: UpsertTableOptionReq,

0 commit comments

Comments
 (0)