Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): fix the filter expression of CdcFilter executor #18868

Merged
merged 9 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions e2e_test/source/cdc_inline/sql_server_cdc/sql_server_cdc.slt
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ CREATE TABLE shared_orders (
) from mssql_source table 'orders';

# invalid table name
statement error Sql Server table 'dbo'.'wrong_orders' doesn't exist
statement error Sql Server table 'UpperDB'.'dbo.wrong_orders' doesn't exist
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand All @@ -180,10 +180,10 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.wrong_orders';
) from mssql_source table 'UpperDB.dbo.wrong_orders';

# invalid schema name
statement error Sql Server table 'wrong_dbo'.'orders' doesn't exist
statement error Sql Server table 'UpperDB'.'wrong_dbo.orders' doesn't exist
CREATE TABLE shared_orders (
order_id INT,
order_date BIGINT,
Expand All @@ -192,7 +192,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'wrong_dbo.orders';
) from mssql_source table 'UpperDB.wrong_dbo.orders';

# invalid primary key
statement error INVALID_ARGUMENT: Primary key mismatch
Expand All @@ -204,7 +204,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_date)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'UpperDB.dbo.orders';

# column name mismatch
statement error INVALID_ARGUMENT: Column 'wrong_order_date' not found in the upstream database
Expand All @@ -216,7 +216,7 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'UpperDB.dbo.orders';

# column data type mismatch
statement error INVALID_ARGUMENT: Incompatible data type of column order_date
Expand All @@ -228,10 +228,10 @@ CREATE TABLE shared_orders (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders';
) from mssql_source table 'UpperDB.dbo.orders';

# table without enabling cdc
statement error INVALID_ARGUMENT: Table 'dbo.orders_without_cdc' has not enabled CDC.
statement error INVALID_ARGUMENT: Table 'UpperDB.dbo.orders_without_cdc' has not enabled CDC.
CREATE TABLE shared_orders_without_cdc (
order_id INT,
order_date BIGINT,
Expand All @@ -240,7 +240,7 @@ CREATE TABLE shared_orders_without_cdc (
product_id INT,
order_status SMALLINT,
PRIMARY KEY (order_id)
) from mssql_source table 'dbo.orders_without_cdc';
) from mssql_source table 'UpperDB.dbo.orders_without_cdc';

# use batch_size = 1 here to ensure not all the data is backfilled in one batch.
statement ok
Expand All @@ -257,12 +257,12 @@ CREATE TABLE shared_orders (
) from mssql_source table 'dbo.orders';

statement ok
CREATE TABLE shared_single_type (*) from mssql_source table 'dbo.single_type';
CREATE TABLE shared_single_type (*) from mssql_source table 'UpperDB.dbo.single_type';

statement ok
CREATE TABLE shared_sqlserver_all_data_types (
*
) from mssql_source table 'dbo.sqlserver_all_data_types';
) from mssql_source table 'UpperDB.dbo.sqlserver_all_data_types';

sleep 5s

Expand Down Expand Up @@ -293,28 +293,28 @@ CREATE TABLE shared_sqlserver_all_data_types (
c_datetimeoffset TIMESTAMPTZ,
c_xml varchar,
PRIMARY KEY (id)
) from mssql_source table 'dbo.sqlserver_all_data_types';
) from mssql_source table 'UpperDB.dbo.sqlserver_all_data_types';

statement error Sql Server table 'UpperSchema'.'UpperTable' doesn't exist in 'mydb'
statement error Sql Server table 'UpperDB'.'UpperSchema.UpperTable' doesn't exist in 'mydb'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from mssql_source table 'UpperSchema.UpperTable';
) from mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement error Column 'name' not found in the upstream database
CREATE TABLE upper_table (
"ID" INT,
name VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'UpperSchema.UpperTable';
) from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement error Sql Server table 'upperSchema'.'upperTable' doesn't exist in 'UpperDB'
statement error Sql Server table 'UpperDB'.'upperSchema.upperTable' doesn't exist in 'UpperDB'
CREATE TABLE upper_table (
"ID" INT,
"Name" VARCHAR,
PRIMARY KEY ("ID")
) from upper_mssql_source table 'upperSchema.upperTable';
) from upper_mssql_source table 'UpperDB.upperSchema.upperTable';

statement ok
CREATE TABLE upper_table (
Expand All @@ -326,7 +326,7 @@ INCLUDE TIMESTAMP AS commit_ts
INCLUDE DATABASE_NAME as database_name
INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
from upper_mssql_source table 'UpperSchema.UpperTable';
from upper_mssql_source table 'UpperDB.UpperSchema.UpperTable';

statement ok
create materialized view shared_orders_cnt as select count(*) as cnt from shared_orders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ var record = event.value();
// - PG: topicPrefix.schemaName.tableName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't PG have the same issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see in the comment, the pg connector doesn't prefixed with database name.
PG: topicPrefix.schemaName.tableName

// - MySQL: topicPrefix.databaseName.tableName
// - Mongo: topicPrefix.databaseName.collectionName
// - SQL Server: topicPrefix.databaseName.schemaName.tableName
// We can extract the full table name from the topic
var fullTableName =
record.topic().substring(record.topic().indexOf('.') + 1);
Expand Down
46 changes: 16 additions & 30 deletions src/frontend/src/optimizer/plan_node/stream_cdc_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,9 @@ impl StreamCdcTableScan {
// We need to pass the id of upstream source job here
let upstream_source_id = self.core.cdc_table_desc.source_id.table_id;

// split the table name from the qualified table name, e.g. `database_name.table_name`
let (_, cdc_table_name) = self
.core
.cdc_table_desc
.external_table_name
.split_once('.')
.unwrap();

// jsonb filter expr: payload->'source'->>'table' = <cdc_table_name>
let filter_expr = Self::build_cdc_filter_expr(cdc_table_name);
// filter upstream source chunk by the value of `_rw_table_name` column
let filter_expr =
Self::build_cdc_filter_expr(self.core.cdc_table_desc.external_table_name.as_str());

let filter_operator_id = self.core.ctx.next_plan_node_id();
// The filter node receive chunks in `(payload, _rw_offset, _rw_table_name)` schema
Expand Down Expand Up @@ -273,28 +266,13 @@ impl StreamCdcTableScan {
})
}

// The filter node receive input chunks in `(payload, _rw_offset, _rw_table_name)` schema
pub fn build_cdc_filter_expr(cdc_table_name: &str) -> ExprImpl {
// jsonb filter expr: payload->'source'->>'table' = <cdc_table_name>
// filter by the `_rw_table_name` column
FunctionCall::new(
ExprType::Equal,
vec![
FunctionCall::new(
ExprType::JsonbAccessStr,
vec![
FunctionCall::new(
ExprType::JsonbAccess,
vec![
InputRef::new(0, DataType::Jsonb).into(),
ExprImpl::literal_varchar("source".into()),
],
)
.unwrap()
.into(),
ExprImpl::literal_varchar("table".into()),
],
)
.unwrap()
.into(),
InputRef::new(2, DataType::Varchar).into(),
ExprImpl::literal_varchar(cdc_table_name.into()),
],
)
Expand Down Expand Up @@ -334,22 +312,27 @@ mod tests {
async fn test_cdc_filter_expr() {
let t1_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 111, "v2": 222.2 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t1", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 774, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();
let t2_json = JsonbVal::from_str(r#"{ "before": null, "after": { "v": 333, "v2": 666.6 }, "source": { "version": "2.2.0.Alpha3", "connector": "mysql", "name": "dbserver1", "ts_ms": 1678428689000, "snapshot": "false", "db": "inventory", "sequence": null, "table": "t2", "server_id": 223344, "gtid": null, "file": "mysql-bin.000003", "pos": 884, "row": 0, "thread": 8, "query": null }, "op": "c", "ts_ms": 1678428689389, "transaction": null }"#).unwrap();

// NOTES: transaction metadata column expects to be filtered out before going to cdc filter
let trx_json = JsonbVal::from_str(r#"{"data_collections": null, "event_count": null, "id": "35319:3962662584", "status": "BEGIN", "ts_ms": 1704263537068}"#).unwrap();
let row1 = OwnedRow::new(vec![
Some(t1_json.into()),
Some(r#"{"file": "1.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);
let row2 = OwnedRow::new(vec![
Some(t2_json.into()),
Some(r#"{"file": "2.binlog", "pos": 100}"#.into()),
Some("abs.t2".into()),
]);

let row3 = OwnedRow::new(vec![
Some(trx_json.into()),
Some(r#"{"file": "3.binlog", "pos": 100}"#.into()),
Some("public.t2".into()),
]);

let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("t1");
let filter_expr = StreamCdcTableScan::build_cdc_filter_expr("public.t2");
assert_eq!(
filter_expr.eval_row(&row1).await.unwrap(),
Some(ScalarImpl::Bool(true))
Expand All @@ -358,6 +341,9 @@ mod tests {
filter_expr.eval_row(&row2).await.unwrap(),
Some(ScalarImpl::Bool(false))
);
assert_eq!(filter_expr.eval_row(&row3).await.unwrap(), None)
assert_eq!(
filter_expr.eval_row(&row3).await.unwrap(),
Some(ScalarImpl::Bool(true))
)
}
}