Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix] Fix sqlserver monitor same table in other database #2335

Merged
merged 2 commits into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
SqlServerSourceConfig sqlserverSourceConfig = (SqlServerSourceConfig) sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return SqlServerConnectionUtils.listTables(
jdbcConnection, sqlserverSourceConfig.getTableFilters());
jdbcConnection,
sqlserverSourceConfig.getTableFilters(),
sqlserverSourceConfig.getDatabaseList());
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static SqlServerConnection createSqlServerConnection(
false);
}

public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilters tableFilters)
public static List<TableId> listTables(
JdbcConnection jdbc, RelationalTableFilters tableFilters, List<String> databaseList)
throws SQLException {
final List<TableId> capturedTableIds = new ArrayList<>();
// -------------------
Expand All @@ -68,7 +69,12 @@ public static List<TableId> listTables(JdbcConnection jdbc, RelationalTableFilte
"SELECT name, database_id, create_date \n" + "FROM sys.databases; ",
rs -> {
while (rs.next()) {
databaseNames.add(rs.getString(1));
// Because sqlserver table filter cannot filter by database name, we need to
// filter here
String databaseName = rs.getString(1);
if (databaseList.contains(databaseName)) {
databaseNames.add(databaseName);
}
}
});
LOG.info("\t list of available databases is: {}", databaseNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void before() {
public void testConsumingAllEvents()
throws SQLException, ExecutionException, InterruptedException {
initializeSqlServerTable("inventory");
initializeSqlServerTable("product");
String sourceDDL =
String.format(
"CREATE TABLE debezium_source ("
Expand Down Expand Up @@ -142,6 +143,9 @@ public void testConsumingAllEvents()
"UPDATE inventory.dbo.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
statement.execute("UPDATE inventory.dbo.products SET weight='5.17' WHERE id=111;");
statement.execute("DELETE FROM inventory.dbo.products WHERE id=111;");

statement.execute(
"INSERT INTO product.dbo.products (name,description,weight) VALUES ('scooter','Big 2-wheel scooter ',5.18);");
}

waitForSinkSize("sink", 20);
Expand Down
30 changes: 30 additions & 0 deletions flink-connector-sqlserver-cdc/src/test/resources/ddl/product.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: product
-- ----------------------------------------------------------------------------------------------------------------
-- Create the product database
CREATE DATABASE product;

USE product;
EXEC sys.sp_cdc_enable_db;

CREATE TABLE products (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
insert into products(name,description,weight)values ('scooter','Small 2-wheel scooter',99.8);
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'products', @role_name = NULL, @supports_net_changes = 0;