Skip to content

Commit

Permalink
feat(dlm): supports sharding using unique indexes (#1327)
Browse files Browse the repository at this point in the history
* DLM supports sharding using unique indexes

* upgrade sdk to snapshot version.

* fix shard key validator

* upgrade sdk to 1.0.9
  • Loading branch information
guowl3 authored Jan 3, 2024
1 parent f68af75 commit 625a707
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<mina.version>2.1.6</mina.version>

<!-- data-lifecycle-manager version -->
<data-lifecycle-manager.version>1.0.8</data-lifecycle-manager.version>
<data-lifecycle-manager.version>1.0.9</data-lifecycle-manager.version>

<!-- plugin version -->
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ public static DataSourceInfo build(ConnectionConfig connectionConfig) {
dataSourceInfo.setIp(connectionConfig.getHost());
dataSourceInfo.setPort(connectionConfig.getPort());
dataSourceInfo.setFullUserName(connectionConfig.getUsername());
dataSourceInfo.setDbType(DataBaseType.MYSQL.name());
dataSourceInfo.setDatabaseType(DataBaseType.MYSQL);
break;
}
case OB_MYSQL: {
dataSourceInfo
.setObProxy(String.format("%s:%s", connectionConfig.getHost(), connectionConfig.getPort()));
dataSourceInfo
.setFullUserName(OBConsoleDataSourceFactory.getUsername(connectionConfig));
dataSourceInfo.setDbType(DataBaseType.OCEANBASEV10.name());
dataSourceInfo.setDatabaseType(DataBaseType.OCEANBASEV10);
dataSourceInfo.setSysUser(connectionConfig.getSysTenantUsername());
dataSourceInfo.setClusterName(connectionConfig.getClusterName());
if (StringUtils.isNotEmpty(connectionConfig.getSysTenantPassword())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public ScheduleEntity buildScheduleEntity(CreateFlowInstanceReq req) {
public void checkTableAndCondition(ConnectionSession connectionSession, Database sourceDb,
List<DataArchiveTableConfig> tables,
List<OffsetConfig> variables) {
checkPrimaryKey(connectionSession, sourceDb.getName(), tables);
checkShardKey(connectionSession, sourceDb.getName(), tables);
Map<DataArchiveTableConfig, String> sqlMap = getDataArchiveSqls(sourceDb, tables, variables);
checkDataArchiveSql(connectionSession, sqlMap);
}
Expand All @@ -89,14 +89,14 @@ public void checkDatasource(ConnectionConfig datasource) {
}
}

private void checkPrimaryKey(ConnectionSession connectionSession, String databaseName,
private void checkShardKey(ConnectionSession connectionSession, String databaseName,
List<DataArchiveTableConfig> tables) {
SyncJdbcExecutor syncJdbcExecutor = connectionSession.getSyncJdbcExecutor(
ConnectionSessionConstants.CONSOLE_DS_KEY);
SqlBuilder sqlBuilder = new MySQLSqlBuilder();
sqlBuilder.append("select table_name from information_schema.COLUMNS where ");
sqlBuilder.append(String.format("table_schema='%s' ", databaseName));
sqlBuilder.append("and column_key = 'PRI' group by table_name;");
sqlBuilder.append(
"SELECT TABLE_NAME from INFORMATION_SCHEMA.STATISTICS where NON_UNIQUE = 0 AND NULLABLE != 'YES' ");
sqlBuilder.append(String.format("AND TABLE_SCHEMA='%s' GROUP BY TABLE_NAME", databaseName));
HashSet<String> tableNames =
new HashSet<>(syncJdbcExecutor.query(sqlBuilder.toString(), (rs, num) -> rs.getString(1)));
tables.forEach(tableConfig -> {
Expand Down

0 comments on commit 625a707

Please sign in to comment.