Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/dev/4.3.2' into leb…
Browse files Browse the repository at this point in the history
…ie_432_bugfix_0924_2
  • Loading branch information
MarkPotato777 committed Sep 24, 2024
2 parents 53af87a + 4b4d16b commit 4dbc8c5
Show file tree
Hide file tree
Showing 23 changed files with 237 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@
import com.oceanbase.odc.service.connection.logicaldatabase.model.PreviewSqlResp;
import com.oceanbase.odc.service.db.schema.model.DBObjectSyncStatus;
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
import com.oceanbase.odc.service.iam.UserService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.User;
import com.oceanbase.odc.service.permission.DBResourcePermissionHelper;
import com.oceanbase.tools.dbbrowser.parser.SqlParser;
import com.oceanbase.tools.sqlparser.statement.Statement;
Expand Down Expand Up @@ -131,6 +133,9 @@ public class LogicalDatabaseService {
@Autowired
private DBResourcePermissionHelper permissionHelper;

@Autowired
private UserService userService;


@Transactional(rollbackFor = Exception.class)
public Database create(@Valid CreateLogicalDatabaseReq req) {
Expand Down Expand Up @@ -247,6 +252,19 @@ public boolean extractLogicalTablesSkipAuth(@NotNull Long logicalDatabaseId) {
return true;
}

public boolean extractLogicalTablesSkipAuth(@NotNull Long logicalDatabaseId, @NotNull Long creatorId) {
Database logicalDatabase =
databaseService.getBasicSkipPermissionCheck(logicalDatabaseId);
Verify.equals(logicalDatabase.getType(), DatabaseType.LOGICAL, "database type");
try {
syncManager.submitExtractLogicalTablesTask(logicalDatabase, new User(userService.nullSafeGet(creatorId)));
} catch (TaskRejectedException ex) {
log.warn("submit extract logical tables task rejected, logical database id={}", logicalDatabaseId);
return false;
}
return true;
}

protected void preCheck(CreateLogicalDatabaseReq req) {
projectPermissionValidator.checkProjectRole(req.getProjectId(),
Arrays.asList(ResourceRoleName.DBA, ResourceRoleName.OWNER));
Expand Down Expand Up @@ -293,9 +311,15 @@ protected void preCheck(CreateLogicalDatabaseReq req) {

public List<PreviewSqlResp> preview(@NonNull Long logicalDatabaseId, @NonNull PreviewSqlReq req) {
DetailLogicalDatabaseResp logicalDatabase = detail(logicalDatabaseId);
Set<DataNode> allDataNodes = logicalDatabase.getLogicalTables().stream()
.map(DetailLogicalTableResp::getAllPhysicalTables).flatMap(List::stream)
.collect(Collectors.toSet());
Set<DataNode> physicalDatabases = logicalDatabase.getPhysicalDatabases().stream()
.map(database -> new DataNode(database.getId(), database.getName())).collect(
Collectors.toSet());
Map<String, DataNode> databaseName2DataNodes = physicalDatabases.stream()
.collect(Collectors.toMap(dataNode -> dataNode.getSchemaName(), dataNode -> dataNode,
(value1, value2) -> value1));
Map<String, Set<DataNode>> logicalTableName2DataNodes = logicalDatabase.getLogicalTables().stream()
.collect(Collectors.toMap(DetailLogicalTableResp::getName,
resp -> resp.getAllPhysicalTables().stream().collect(Collectors.toSet())));
Map<Long, List<String>> databaseId2Sqls = new HashMap<>();
String delimiter = StringUtils.isEmpty(req.getDelimiter()) ? ";" : req.getDelimiter();
List<String> sqls =
Expand All @@ -306,23 +330,19 @@ public List<PreviewSqlResp> preview(@NonNull Long logicalDatabaseId, @NonNull Pr
Set<DataNode> dataNodesToExecute;
if (statement instanceof CreateTable) {
dataNodesToExecute = LogicalDatabaseUtils.getDataNodesFromCreateTable(sql,
logicalDatabase.getDialectType(), allDataNodes);
logicalDatabase.getDialectType(), databaseName2DataNodes);
} else {
dataNodesToExecute = LogicalDatabaseUtils.getDataNodesFromNotCreateTable(sql,
logicalDatabase.getDialectType(), logicalDatabase);
logicalDatabase.getDialectType(), logicalTableName2DataNodes, logicalDatabase.getName());
}
if (CollectionUtils.isEmpty(dataNodesToExecute)) {
continue;
}
RewriteResult rewriteResult = sqlRewriter.rewrite(
new RewriteContext(statement, logicalDatabase.getDialectType(), dataNodesToExecute));
for (Map.Entry<DataNode, String> result : rewriteResult.getSqls().entrySet()) {
Long databaseId = result.getKey().getDatabaseId();
if (databaseId == null) {
throw new BadRequestException(
"physical database not found, database name=" + result.getKey().getSchemaName());
}
databaseId2Sqls.computeIfAbsent(databaseId, k -> new ArrayList<>()).add(result.getValue());
databaseId2Sqls.computeIfAbsent(result.getKey().getDatabaseId(), k -> new ArrayList<>())
.add(result.getValue());
}
}
if (MapUtils.isEmpty(databaseId2Sqls)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.oceanbase.odc.service.connection.database.model.Database;
import com.oceanbase.odc.service.connection.table.TableService;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.iam.model.User;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -77,6 +78,13 @@ public void submitExtractLogicalTablesTask(@NotNull Database logicalDatabase) {
jdbcLockRegistry, authenticationFacade.currentUser(), tableService)));
}

public void submitExtractLogicalTablesTask(@NotNull Database logicalDatabase, @NotNull User creator) {
doExecute(() -> executor
.submit(new LogicalTableExtractTask(logicalDatabase, databaseRepository, dbRelationRepository,
databaseService, dbObjectRepository, tableRelationRepository, connectionService,
jdbcLockRegistry, creator, tableService)));
}

public void submitCheckConsistencyTask(@NotNull Long logicalTableId) {
doExecute(() -> executor
.submit(new LogicalTableCheckConsistencyTask(logicalTableId, tableRelationRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@
package com.oceanbase.odc.service.connection.logicaldatabase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.core.shared.exception.BadRequestException;
import com.oceanbase.odc.core.sql.execute.model.SqlTuple;
import com.oceanbase.odc.service.connection.logicaldatabase.core.model.DataNode;
import com.oceanbase.odc.service.connection.logicaldatabase.core.parser.LogicalTableExpressionParseUtils;
import com.oceanbase.odc.service.connection.logicaldatabase.model.DetailLogicalDatabaseResp;
import com.oceanbase.odc.service.connection.logicaldatabase.model.DetailLogicalTableResp;
import com.oceanbase.odc.service.session.util.DBSchemaExtractor;
import com.oceanbase.odc.service.session.util.DBSchemaExtractor.DBSchemaIdentity;
import com.oceanbase.tools.dbbrowser.parser.constant.SqlType;
Expand All @@ -40,10 +37,7 @@
*/
public class LogicalDatabaseUtils {
public static Set<DataNode> getDataNodesFromCreateTable(String sql, DialectType dialectType,
Set<DataNode> allDataNodes) {
Map<String, DataNode> databaseName2DataNodes = allDataNodes.stream()
.collect(Collectors.toMap(dataNode -> dataNode.getSchemaName(), dataNode -> dataNode,
(value1, value2) -> value1));
Map<String, DataNode> databaseName2DataNodes) {
Map<DBSchemaIdentity, Set<SqlType>> identity2SqlTypes = DBSchemaExtractor.listDBSchemasWithSqlTypes(
Arrays.asList(SqlTuple.newTuple(sql)), dialectType, null);
DBSchemaIdentity identity = identity2SqlTypes.keySet().iterator().next();
Expand All @@ -57,18 +51,23 @@ public static Set<DataNode> getDataNodesFromCreateTable(String sql, DialectType
Collectors.toSet());
dataNodesToExecute.forEach(dataNode -> dataNode.setDatabaseId(
databaseName2DataNodes.getOrDefault(dataNode.getSchemaName(), dataNode).getDatabaseId()));
dataNodesToExecute.forEach(dataNode -> {
if (!databaseName2DataNodes.containsKey(dataNode.getSchemaName())) {
throw new BadRequestException("physical database not found, database name=" + dataNode.getSchemaName());
}
dataNode.setDatabaseId(databaseName2DataNodes.get(dataNode.getSchemaName()).getDatabaseId());
});
return dataNodesToExecute;
}

public static Set<DataNode> getDataNodesFromNotCreateTable(String sql, DialectType dialectType,
DetailLogicalDatabaseResp detailLogicalDatabaseResp) {
List<DetailLogicalTableResp> logicalTables = detailLogicalDatabaseResp.getLogicalTables();
Map<String, Set<DataNode>> logicalTableName2DataNodes = logicalTables.stream()
.collect(Collectors.toMap(DetailLogicalTableResp::getName,
resp -> resp.getAllPhysicalTables().stream().collect(Collectors.toSet())));
Map<String, Set<DataNode>> logicalTableName2DataNodes, String logicalDatabaseName) {
Map<DBSchemaIdentity, Set<SqlType>> identity2SqlTypes = DBSchemaExtractor.listDBSchemasWithSqlTypes(
Arrays.asList(SqlTuple.newTuple(sql)), dialectType, detailLogicalDatabaseResp.getName());
Arrays.asList(SqlTuple.newTuple(sql)), dialectType, logicalDatabaseName);
DBSchemaIdentity identity = identity2SqlTypes.keySet().iterator().next();
return logicalTableName2DataNodes.getOrDefault(identity.getTable(), Collections.emptySet());
if (!logicalTableName2DataNodes.containsKey(identity.getTable())) {
throw new BadRequestException("logical table not found, logical table name=" + identity.getTable());
}
return logicalTableName2DataNodes.get(identity.getTable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public LogicalTableExtractTask(@NonNull Database logicalDatabase, @NonNull Datab
@Override
public void run() {
SecurityContextUtils.setCurrentUser(creator);
log.info("Start to extract logical tables for database id={}", logicalDatabase.getId());
databaseService.updateObjectLastSyncTimeAndStatus(logicalDatabase.getId(), DBObjectSyncStatus.SYNCING);
List<DatabaseMappingEntity> relations =
dbRelationRepository.findByLogicalDatabaseId(logicalDatabase.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import com.oceanbase.odc.service.connection.logicaldatabase.core.parser.LogicalTableExpressionParseUtils;
import com.oceanbase.odc.service.connection.logicaldatabase.model.DetailLogicalTableResp;
import com.oceanbase.odc.service.connection.logicaldatabase.model.LogicalTableTopologyResp;
import com.oceanbase.odc.service.connection.model.ConnectionConfig;
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.permission.DBResourcePermissionHelper;
Expand Down Expand Up @@ -117,16 +116,6 @@ public List<DetailLogicalTableResp> list(@NotNull Long logicalDatabaseId) {

List<DBObjectEntity> logicalTables =
dbObjectRepository.findByDatabaseIdAndType(logicalDatabaseId, DBObjectType.LOGICAL_TABLE);
Set<Long> physicalDBIds =
databaseMappingRepository.findByLogicalDatabaseId(logicalDatabaseId).stream()
.map(DatabaseMappingEntity::getPhysicalDatabaseId).collect(Collectors.toSet());
List<Database> physicalDatabases = databaseService.listDatabasesDetailsByIds(physicalDBIds);
Map<Long, Database> id2Database =
physicalDatabases.stream().collect(Collectors.toMap(Database::getId, db -> db));
Map<Long, ConnectionConfig> id2Connections = connectionService.listForConnectionSkipPermissionCheck(
physicalDatabases.stream().map(db -> db.getDataSource().getId()).collect(Collectors.toList())).stream()
.collect(Collectors.toMap(ConnectionConfig::getId, c -> c));

Set<Long> logicalTableIds = logicalTables.stream().map(DBObjectEntity::getId).collect(Collectors.toSet());
if (CollectionUtils.isEmpty(logicalTableIds)) {
return Collections.emptyList();
Expand All @@ -149,8 +138,6 @@ public List<DetailLogicalTableResp> list(@NotNull Long logicalDatabaseId) {
dataNode.setDatabaseId(relation.getPhysicalDatabaseId());
dataNode.setSchemaName(relation.getPhysicalDatabaseName());
dataNode.setTableName(relation.getPhysicalTableName());
dataNode.setDataSourceConfig(
id2Connections.get(id2Database.get(relation.getPhysicalDatabaseId()).getDataSource().getId()));
inconsistentPhysicalTables.add(dataNode);
});
resp.setInconsistentPhysicalTables(inconsistentPhysicalTables);
Expand All @@ -159,8 +146,6 @@ public List<DetailLogicalTableResp> list(@NotNull Long logicalDatabaseId) {
dataNode.setDatabaseId(relation.getPhysicalDatabaseId());
dataNode.setSchemaName(relation.getPhysicalDatabaseName());
dataNode.setTableName(relation.getPhysicalTableName());
dataNode.setDataSourceConfig(
id2Connections.get(id2Database.get(relation.getPhysicalDatabaseId()).getDataSource().getId()));
return dataNode;
}).collect(Collectors.toList()));
return resp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,20 @@
public class ExecutionSubGroupUnit<Input, Result> {
private final String id;
private final Long order;
private final ExecutionHandler<Input, Result> callback;
private final ExecutionHandler<Input, Result> handler;
private final Input input;

public ExecutionSubGroupUnit(String id, Long order, ExecutionHandler<Input, Result> executionCallback,
Input input) {
this.id = id;
this.order = order;
this.callback = executionCallback;
this.handler = executionCallback;
this.input = input;
}

public void beforeExecute(ExecutionGroupContext<Input, Result> context) {
try {
context.setExecutionResult(id, (k, v) -> callback.beforeExecute(context));
context.setExecutionResult(id, (k, v) -> handler.beforeExecute(context));
} catch (Exception e) {
log.warn("ExecutionUnit execute failed, executionId={}, ex=", id, e);
}
Expand All @@ -51,46 +51,67 @@ public void execute(ExecutionGroupContext<Input, Result> context) {
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() != ExecutionStatus.PENDING) {
throw new IllegalStateException(
"ExecutionUnit is not in PENDING status, executionId=" + id + ", status=" + v.getStatus());
"Cannot execute because ExecutionUnit is not in PENDING status, executionId=" + id + ", status="
+ v.getStatus());
}
log.info("ExecutionUnit starts to execute, executionId={}", id);
v.setStatus(ExecutionStatus.RUNNING);
return v;
});
context.setExecutionResult(id, (k, v) -> {
if (context.getExecutionResult(id).getStatus() == ExecutionStatus.RUNNING) {
try {
if (v.getStatus() == ExecutionStatus.RUNNING) {
ExecutionResult<Result> result = callback.execute(context);
log.info("ExecutionUnit execute done, executionId={}", id);
ExecutionResult<Result> result = handler.execute(context);
log.info("ExecutionUnit execute done, executionId={}", id);
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() == ExecutionStatus.RUNNING) {
return result;
}
result.setStatus(v.getStatus());
return result;
}
log.warn("Abort to execute, as the ExecutionUnit({}) is not in RUNNING status, executionId={}",
v.getStatus(), id);
return v;
});
} catch (Exception e) {
log.warn("ExecutionUnit execute failed, executionId={}, ex=", id, e);
v.setStatus(ExecutionStatus.FAILED);
return v;
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() == ExecutionStatus.RUNNING) {
v.setStatus(ExecutionStatus.FAILED);
}
return v;
});
}
});
}
}

public void terminate(ExecutionGroupContext<Input, Result> context) {
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() == ExecutionStatus.RUNNING) {
try {
log.info("ExecutionUnit starts to terminate, executionId={}", id);
v.setStatus(ExecutionStatus.TERMINATING);
callback.terminate(context);
v.setStatus(ExecutionStatus.TERMINATED);
log.info("ExecutionUnit terminated, executionId={}", id);
} catch (Exception e) {
log.warn("ExecutionUnit terminate failed, executionId={}", id, e);
v.setStatus(ExecutionStatus.TERMINATE_FAILED);
}
if (v.getStatus() != ExecutionStatus.RUNNING) {
throw new IllegalStateException(
"Cannot terminate because ExecutionUnit is not in RUNNING status, executionId=" + id
+ ", status=" + v.getStatus());
}
log.info("ExecutionUnit starts to terminate, executionId={}", id);
v.setStatus(ExecutionStatus.TERMINATING);
return v;
});
if (context.getExecutionResult(id).getStatus() == ExecutionStatus.TERMINATING) {
try {
handler.terminate(context);
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() == ExecutionStatus.TERMINATING) {
log.info("ExecutionUnit terminate done, executionId={}", id);
v.setStatus(ExecutionStatus.TERMINATED);
}
return v;
});
} catch (Exception ex) {
log.warn("ExecutionUnit terminate failed, executionId={}, ex=", id, ex);
context.setExecutionResult(id, (k, v) -> {
if (v.getStatus() == ExecutionStatus.TERMINATING) {
v.setStatus(ExecutionStatus.TERMINATE_FAILED);
}
return v;
});
}
}
}

public void skip(ExecutionGroupContext<Input, Result> context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public ExecutionGroupContext<Input, Result> execute(List<ExecutionGroup<Input, R

public void terminate(String executionUnitId) {
try {
log.info("Terminate executionUnit, executionId={}", executionUnitId);
ExecutionSubGroupUnit<Input, Result> executionUnit = executionContext.getExecutionUnit(executionUnitId);
executionUnit.terminate(executionContext);
} catch (Exception ex) {
Expand Down
Loading

0 comments on commit 4dbc8c5

Please sign in to comment.