Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix the error of active refresh failure of cross-database table metadata #6759

Merged
merged 12 commits into from
Sep 3, 2024
Merged
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6781](https://github.com/apache/incubator-seata/pull/6781)] the issue where the TC occasionally fails to go offline from the NamingServer
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] fall back to any of available cluster address when query cluster address is empty
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] make exception message generic for all database drivers
- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] fix the error of active refresh failure of cross-database table metadata


### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- [[#6797](https://github.com/apache/incubator-seata/pull/6797)] 当查询的集群地址为空时,获取可用的任意集群地址
- [[#6800](https://github.com/apache/incubator-seata/pull/6800)] 使异常消息对所有数据库驱动程序通用

- [[#6759](https://github.com/apache/incubator-seata/pull/6759)] 修复跨库表主动刷新`tableMeta`的异常问题

### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ public TableMeta getTableMeta(final Connection connection, final String tableNam
public void refresh(final Connection connection, String resourceId) {
ConcurrentMap<String, TableMeta> tableMetaMap = TABLE_META_CACHE.asMap();
for (Map.Entry<String, TableMeta> entry : tableMetaMap.entrySet()) {
String key = getCacheKey(connection, entry.getValue().getTableName(), resourceId);
String key = getCacheKey(connection, entry.getValue().getOriginalTableName(), resourceId);
if (entry.getKey().equals(key)) {
try {
TableMeta tableMeta = fetchSchema(connection, entry.getValue().getTableName());
String freshTableName = StringUtils.isBlank(entry.getValue().getOriginalTableName()) ?
entry.getValue().getTableName() : entry.getValue().getOriginalTableName();
TableMeta tableMeta = fetchSchema(connection, freshTableName);
if (!tableMeta.equals(entry.getValue())) {
TABLE_META_CACHE.put(entry.getKey(), tableMeta);
LOGGER.info("table meta change was found, update table meta cache automatically.");
Expand All @@ -99,6 +101,7 @@ public void refresh(final Connection connection, String resourceId) {
}
}


/**
* generate cache key
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableNam

TableNameMeta tableNameMeta = toTableNameMeta(tableName, dbmd.getConnection().getSchema());
result.setTableName(tableNameMeta.getTableName());
result.setOriginalTableName(tableName);
try (ResultSet rsColumns = dbmd.getColumns("", tableNameMeta.getSchema(), tableNameMeta.getTableName(), "%");
ResultSet rsIndex = dbmd.getIndexInfo(null, tableNameMeta.getSchema(), tableNameMeta.getTableName(), false, true);
ResultSet rsPrimary = dbmd.getPrimaryKeys(null, tableNameMeta.getSchema(), tableNameMeta.getTableName())) {
Expand All @@ -67,7 +68,6 @@ protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableNam
processIndexes(result, rsIndex);

processPrimaries(result, rsPrimary);

if (result.getAllIndexes().isEmpty()) {
throw new ShouldNeverHappenException(String.format("Could not found any index in the table: %s", tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MARIADB) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public class MysqlTableMetaCache extends AbstractTableMetaCache {
protected String getCacheKey(Connection connection, String tableName, String resourceId) {
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");
//remove single quote and separate it to catalogName and tableName
String[] tableNameWithCatalog = tableName.replace("`", "").split("\\.");
String defaultTableName = tableNameWithCatalog.length > 1 ? tableNameWithCatalog[1] : tableNameWithCatalog[0];
//original: remove single quote and separate it to catalogName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
String defaultTableName = ColumnUtils.delEscape(tableName, JdbcConstants.MYSQL);

DatabaseMetaData databaseMetaData = null;
DatabaseMetaData databaseMetaData;
try {
databaseMetaData = connection.getMetaData();
} catch (SQLException e) {
Expand Down Expand Up @@ -80,15 +80,15 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.MYSQL) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
throw new SQLException(String.format("Failed to fetch schema of %s", tableName), e);
}
}

protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd)
protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaData dbmd, String originalTableName)
throws SQLException {
//always "" for mysql
String schemaName = rsmd.getSchemaName(1);
Expand All @@ -110,6 +110,10 @@ protected TableMeta resultSetMetaToSchema(ResultSetMetaData rsmd, DatabaseMetaDa
// May be not consistent with lower_case_table_names
tm.setCaseSensitive(true);

// Save the original table name information for active cache refresh
// to avoid refresh failure caused by missing catalog information
tm.setOriginalTableName(originalTableName);

/*
* here has two different type to get the data
* make sure the table name was right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,14 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema.length > 1 ? tableNameWithSchema[1] : tableNameWithSchema[0];

//original: separate it to schemaName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
//oracle does not implement supportsMixedCaseIdentifiers in DatabaseMetadata
if (defaultTableName.contains("\"")) {
cacheKey.append(defaultTableName.replace("\"", ""));
if (tableName.contains("\"")) {
cacheKey.append(tableName.replace("\"", ""));
} else {
// oracle default store in upper case
cacheKey.append(defaultTableName.toUpperCase());
cacheKey.append(tableName.toUpperCase());
}

return cacheKey.toString();
Expand All @@ -75,6 +73,9 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws

protected TableMeta resultSetMetaToSchema(DatabaseMetaData dbmd, String tableName) throws SQLException {
TableMeta tm = new TableMeta();
// Save the original table name information for active cache refresh
// to avoid refresh failure caused by missing catalog information
tm.setOriginalTableName(tableName);
String[] schemaTable = tableName.split("\\.");
String schemaName = schemaTable.length > 1 ? schemaTable[0] : dbmd.getUserName();
tableName = schemaTable.length > 1 ? schemaTable[1] : tableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
String sql = "SELECT * FROM " + ColumnUtils.addEscape(tableName, JdbcConstants.POLARDBX) + " LIMIT 1";
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData());
return resultSetMetaToSchema(rs.getMetaData(), connection.getMetaData(), tableName);
} catch (SQLException sqlEx) {
throw sqlEx;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema.length > 1 ? tableNameWithSchema[1] : tableNameWithSchema[0];

//original: separate it to schemaName and tableName
//now: Use the original table name to avoid cache errors of tables with the same name across databases
//postgres does not implement supportsMixedCaseIdentifiers in DatabaseMetadata
if (defaultTableName.contains("\"")) {
cacheKey.append(defaultTableName.replace("\"", ""));
if (tableName.contains("\"")) {
cacheKey.append(tableName.replace("\"", ""));
} else {
//postgres default store in lower case
cacheKey.append(defaultTableName.toLowerCase());
cacheKey.append(tableName.toLowerCase());
}

return cacheKey.toString();
Expand All @@ -73,6 +71,7 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
DatabaseMetaData dbmd = connection.getMetaData();
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
tm.setOriginalTableName(tableName);
String[] schemaTable = tableName.split("\\.");
String schemaName = schemaTable.length > 1 ? schemaTable[0] : null;
tableName = schemaTable.length > 1 ? schemaTable[1] : tableName;
Expand Down Expand Up @@ -185,8 +184,8 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
}

while (rsTable.next()) {
String rsTableName = rsTable.getString("TABLE_NAME");
String rsTableSchema = rsTable.getString("TABLE_SCHEM");
String rsTableName = rsTable.getString("TABLE_NAME");
//set origin tableName with schema if necessary
if ("public".equalsIgnoreCase(rsTableSchema)) {
//for compatibility reasons, old clients generally do not have the 'public' default schema by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,24 @@ protected String getCacheKey(Connection connection, String tableName, String res
StringBuilder cacheKey = new StringBuilder(resourceId);
cacheKey.append(".");

//separate it to schemaName and tableName
String[] tableNameWithSchema = tableName.split("\\.");
String defaultTableName = tableNameWithSchema[tableNameWithSchema.length - 1];

DatabaseMetaData databaseMetaData;
try {
databaseMetaData = connection.getMetaData();
} catch (SQLException e) {
LOGGER.error("Could not get connection, use default cache key {}", e.getMessage(), e);
return cacheKey.append(defaultTableName).toString();
return cacheKey.append(tableName).toString();
}

try {
//prevent duplicated cache key
if (databaseMetaData.supportsMixedCaseIdentifiers()) {
cacheKey.append(defaultTableName);
cacheKey.append(tableName);
} else {
cacheKey.append(defaultTableName.toUpperCase());
cacheKey.append(tableName.toUpperCase());
}
} catch (SQLException e) {
LOGGER.error("Could not get supportsMixedCaseIdentifiers in connection metadata, use default cache key {}", e.getMessage(), e);
return cacheKey.append(defaultTableName).toString();
return cacheKey.append(tableName).toString();
}

return cacheKey.toString();
Expand All @@ -88,6 +84,7 @@ protected TableMeta fetchSchema(Connection connection, String tableName) throws
private TableMeta resultSetMetaToSchema(Connection connection, String tableName) throws SQLException {
TableMeta tm = new TableMeta();
tm.setTableName(tableName);
tm.setOriginalTableName(tableName);

tableName = ColumnUtils.delEscape(tableName, JdbcConstants.SQLSERVER);
String[] schemaTable = tableName.split("\\.");
Expand Down Expand Up @@ -189,8 +186,8 @@ private TableMeta resultSetMetaToSchema(Connection connection, String tableName)
}

while (rsTable.next()) {
String rsTableName = rsTable.getString("TABLE_NAME");
String rsTableSchema = rsTable.getString("TABLE_SCHEM");
String rsTableName = rsTable.getString("TABLE_NAME");
//set origin tableName with schema if necessary
if ("dbo".equalsIgnoreCase(rsTableSchema)) {
//for compatibility reasons, old clients generally do not have the 'dbo' default schema by default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setUrl("jdbc:mock:xxx2");
dataSource.setDriver(mockDriver);

DataSourceProxy newDataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setDriver(mockDriver);
Expand All @@ -159,6 +159,7 @@ public void init() throws SQLException {

@Test
public void testBeforeAndAfterImage() throws SQLException {
System.out.println(newStatementProxy);
String sql = "insert into table_insert_executor_test(id, user_id, name, sex) values (1, 1, 'will', 1)";
List<SQLStatement> asts = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL);
MySQLInsertRecognizer recognizer = new MySQLInsertRecognizer(sql, asts.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public void init() throws SQLException {
new Object[]{0, "update_time", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setUrl("jdbc:mock:xxx1");
dataSource.setDriver(mockDriver);

DataSourceProxy newDataSourceProxy = DataSourceProxyTest.getDataSourceProxy(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void init() {
new Object[]{0, "updated", Types.INTEGER, "INTEGER", 64, 10, 0, 0}
};

MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue);
MockDriver mockDriver = new MockDriver(returnValueColumnLabels, returnValue, columnMetas, indexMetas, null, onUpdateColumnsReturnValue, new Object[][]{});
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl("jdbc:mock:xxx");
dataSource.setDriver(mockDriver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ public class MockDatabaseMetaData implements DatabaseMetaData {
);

private static List<String> tableMetaColumnLabels = Arrays.asList(
"TABLE_NAME",
"TABLE_SCHEM"
"TABLE_CAT",
"TABLE_SCHEM",
"TABLE_NAME"
);

private Object[][] columnsMetasReturnValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,19 @@ public MockDriver() {
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{});
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{}, new Object[][]{});
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue);
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue, new Object[][]{});
}

public MockDriver(Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue, Object[][] mockTableMetasReturnValue) {
this(Lists.newArrayList(), new Object[][]{}, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, mockPkMetasReturnValue, mockTableMetasReturnValue);
}

public MockDriver(List<String> mockReturnValueColumnLabels, Object[][] mockReturnValue, Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue) {
this(mockReturnValueColumnLabels, mockReturnValue, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{});
this(mockReturnValueColumnLabels, mockReturnValue, mockColumnsMetasReturnValue, mockIndexMetasReturnValue, new Object[][]{}, new Object[][]{});
}

public MockDriver(List<String> mockReturnValueColumnLabels, Object[][] mockReturnValue, Object[][] mockColumnsMetasReturnValue, Object[][] mockIndexMetasReturnValue, Object[][] mockPkMetasReturnValue) {
Expand Down
Loading
Loading