Skip to content

Commit

Permalink
feat(sql): Remove the ROW FORMAT keyword for CREATE statement of nati…
Browse files Browse the repository at this point in the history
…ve CDC connectors (#7133)

Remove the ROW FORMAT keyword for CREATE statement of native CDC connectors, which must be debezium json format.

```
create materialized source orders (
order_id int,
order_date timestamp,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) with (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
...
) row format debezium_json <-- removed
```

Approved-By: neverchanje
  • Loading branch information
StrikeW authored Dec 30, 2022
1 parent b8bdfdf commit 7d5d505
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 19 deletions.
8 changes: 4 additions & 4 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ create materialized source products ( id INT,
database.name = 'mydb',
table.name = 'products',
server.id = '5085'
) row format debezium_json;
);

statement ok
create materialized view products_cnt as select count(*) as cnt from products;
Expand All @@ -37,7 +37,7 @@ create materialized source orders (
database.name = 'mydb',
table.name = 'orders',
server.id = '5086'
) row format debezium_json;
);

statement ok
create materialized view orders_cnt as select count(*) as cnt from orders;
Expand All @@ -60,7 +60,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);

statement ok
create materialized view shipments_cnt as select count(*) as cnt from shipments;
Expand All @@ -87,4 +87,4 @@ create materialized source mytable (
database.name = 'mydb',
table.name = 'mytable',
server.id = '5087'
) row format debezium_json;
);
14 changes: 7 additions & 7 deletions e2e_test/source/cdc/cdc.validate.mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ create materialized source products ( id INT,
database.name = 'mydb',
table.name = 'products',
server.id = '5085'
) row format debezium_json;
);

# invalid password
statement error
Expand All @@ -32,7 +32,7 @@ create materialized source products ( id INT,
database.name = 'mydb',
table.name = 'products',
server.id = '5085'
) row format debezium_json;
);

# invalid database name
statement error
Expand All @@ -49,7 +49,7 @@ create materialized source products ( id INT,
database.name = 'mdb',
table.name = 'products',
server.id = '5085'
) row format debezium_json;
);

# invalid table name
statement error
Expand All @@ -66,7 +66,7 @@ create materialized source products ( id INT,
database.name = 'mydb',
table.name = 'prdcts',
server.id = '5085'
) row format debezium_json;
);

# invalid primary key
statement error
Expand All @@ -87,7 +87,7 @@ create materialized source orders (
database.name = 'mydb',
table.name = 'orders',
server.id = '5086'
) row format debezium_json;
);

# column data type mismatch
statement error
Expand All @@ -108,7 +108,7 @@ create materialized source orders (
database.name = 'mydb',
table.name = 'orders',
server.id = '5086'
) row format debezium_json;
);

# column name mismatch
statement error
Expand All @@ -129,4 +129,4 @@ create materialized source orders (
database.name = 'mydb',
table.name = 'orders',
server.id = '5086'
) row format debezium_json;
);
12 changes: 6 additions & 6 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);


# invalid password
Expand All @@ -41,7 +41,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);

# invalid table name
statement error
Expand All @@ -62,7 +62,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipment',
slot.name = 'shipments'
) row format debezium_json;
);


# invalid primary key
Expand All @@ -84,7 +84,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);


# column name mismatch
Expand All @@ -106,7 +106,7 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);

# column data type mismatch
statement error
Expand All @@ -127,4 +127,4 @@ create materialized source shipments (
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) row format debezium_json;
);
14 changes: 12 additions & 2 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,18 @@ impl ParseTo for CreateSourceStatement {
let (columns, constraints) = p.parse_columns()?;

impl_parse_to!(with_properties: WithProperties, p);
impl_parse_to!([Keyword::ROW, Keyword::FORMAT], p);
impl_parse_to!(source_schema: SourceSchema, p);
let option = with_properties
.0
.iter()
.find(|&opt| opt.name.real_value() == "connector");
// row format for cdc source must be debezium json
let source_schema = if let Some(opt) = option && opt.value.to_string().contains("-cdc") {
SourceSchema::DebeziumJson
} else {
impl_parse_to!([Keyword::ROW, Keyword::FORMAT], p);
SourceSchema::parse_to(p)?
};

Ok(Self {
if_not_exists,
columns,
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#![cfg_attr(not(feature = "std"), no_std)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![expect(clippy::derive_partial_eq_without_eq)]
#![expect(clippy::doc_markdown)]
#![expect(clippy::upper_case_acronyms)]
Expand Down

0 comments on commit 7d5d505

Please sign in to comment.