diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepositoryTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepositoryTest.java new file mode 100644 index 0000000000..484684fe93 --- /dev/null +++ b/server/integration-test/src/test/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepositoryTest.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.metadb.databasechange; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import com.oceanbase.odc.ServiceTestEnv; + +/** + * @author: zijia.cj + * @date: 2024/4/24 + */ +public class DatabaseChangeChangingOrderTemplateRepositoryTest extends ServiceTestEnv { + + private static final Long PROJECT_ID = 1L; + private static final Long CURRENT_USER_ID = 1L; + + private static final Long ORGANIZATION_ID = 1L; + private static final String TEMPLATE_NAME = "template"; + + @Autowired + private DatabaseChangeChangingOrderTemplateRepository templateRepository; + + @Before + public void setUp() { + templateRepository.deleteAll(); + } + + @After + public void clear() { + templateRepository.deleteAll(); + } + + + @Test + public void existsByNameAndProjectId_checkTemplateExist_succeed() { + create(); + Boolean result = + templateRepository.existsByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID); + assertTrue(result); + } + + @Test + public void findByNameAndProjectId_getTemplate_succeed() { + DatabaseChangeChangingOrderTemplateEntity databaseChangeChangingOrderTemplateEntity = create(); + Optional result = + templateRepository.findByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID); + DatabaseChangeChangingOrderTemplateEntity templateEntity = result.get(); + assertNotNull(templateEntity); + assertEquals(databaseChangeChangingOrderTemplateEntity, templateEntity); + } + + @Test + public void findByIdAndProjectId_getTemplate_succeed() { + DatabaseChangeChangingOrderTemplateEntity databaseChangeChangingOrderTemplateEntity = create(); + Optional result = + templateRepository.findByIdAndProjectId(databaseChangeChangingOrderTemplateEntity.getId(), PROJECT_ID); + DatabaseChangeChangingOrderTemplateEntity templateEntity = result.get(); + assertNotNull(templateEntity); + assertEquals(databaseChangeChangingOrderTemplateEntity, templateEntity); + } + + @Test + public void updateEnabledByIds_updateTemplate_succeed() { + DatabaseChangeChangingOrderTemplateEntity databaseChangeChangingOrderTemplateEntity = create(); + templateRepository.updateEnabledByIds( + Collections.singletonList(databaseChangeChangingOrderTemplateEntity.getId())); + Optional result = templateRepository.findById( + databaseChangeChangingOrderTemplateEntity.getId()); + DatabaseChangeChangingOrderTemplateEntity templateEntity = result.get(); + assertEquals(false, templateEntity.getEnabled()); + } + + private DatabaseChangeChangingOrderTemplateEntity create() { + DatabaseChangeChangingOrderTemplateEntity databaseChangeChangingOrderTemplateEntity = + new DatabaseChangeChangingOrderTemplateEntity(); + databaseChangeChangingOrderTemplateEntity.setName(TEMPLATE_NAME); + databaseChangeChangingOrderTemplateEntity.setProjectId(PROJECT_ID); + databaseChangeChangingOrderTemplateEntity.setOrganizationId(ORGANIZATION_ID); + databaseChangeChangingOrderTemplateEntity.setCreatorId(CURRENT_USER_ID); + List> orders = new ArrayList<>(); + orders.add(Arrays.asList(1L, 2L)); + orders.add(Arrays.asList(3L, 4L)); + databaseChangeChangingOrderTemplateEntity.setDatabaseSequences(orders); + databaseChangeChangingOrderTemplateEntity.setEnabled(true); + return templateRepository.save( + databaseChangeChangingOrderTemplateEntity); + } +} + diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateServiceTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateServiceTest.java new file mode 100644 index 0000000000..daf2e9c8ed --- /dev/null +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateServiceTest.java @@ -0,0 +1,248 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; + +import com.oceanbase.odc.ServiceTestEnv; +import com.oceanbase.odc.core.shared.exception.BadArgumentException; +import com.oceanbase.odc.core.shared.exception.BadRequestException; +import com.oceanbase.odc.core.shared.exception.NotFoundException; +import com.oceanbase.odc.metadb.collaboration.ProjectRepository; +import com.oceanbase.odc.metadb.connection.DatabaseEntity; +import com.oceanbase.odc.metadb.connection.DatabaseRepository; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateEntity; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateRepository; +import com.oceanbase.odc.service.databasechange.model.CreateDatabaseChangeChangingOrderTemplateReq; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeChangingOrderTemplateResp; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangingOrderTemplateExists; +import com.oceanbase.odc.service.databasechange.model.QueryDatabaseChangeChangingOrderParams; +import com.oceanbase.odc.service.databasechange.model.UpdateDatabaseChangeChangingOrderReq; +import com.oceanbase.odc.service.iam.ProjectPermissionValidator; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; + +/** + * @author: zijia.cj + * @date: 2024/4/23 + */ +public class DatabaseChangeChangingOrderTemplateServiceTest extends ServiceTestEnv { + + private static final Long PROJECT_ID = 1L; + private static final Long CURRENT_USER_ID = 1L; + + private static final Long ORGANIZATION_ID = 1L; + private static final String TEMPLATE_NAME = "template"; + private static final String TEMPLATE_RENAME = "template_rename"; + + @Autowired + private DatabaseChangeChangingOrderTemplateService templateService; + @Autowired + private DatabaseChangeChangingOrderTemplateRepository templateRepository; + @MockBean + private AuthenticationFacade authenticationFacade; + @MockBean + private DatabaseRepository databaseRepository; + @MockBean + private ProjectRepository projectRepository; + @MockBean + private ProjectPermissionValidator projectPermissionValidator; + + @Before + public void setUp() { + templateRepository.deleteAll(); + when(authenticationFacade.currentUserId()).thenReturn(CURRENT_USER_ID); + when(authenticationFacade.currentOrganizationId()).thenReturn(ORGANIZATION_ID); + when(projectRepository.existsById(any())).thenReturn(true); + when(projectPermissionValidator.hasProjectRole(anyLong(), any())).thenReturn(true); + } + + @After + public void clear() { + templateRepository.deleteAll(); + } + + @Test + public void createDatabaseChangingOrderTemplate_saveEntity_succeed() { + CreateDatabaseChangeChangingOrderTemplateReq req = new CreateDatabaseChangeChangingOrderTemplateReq(); + req.setProjectId(PROJECT_ID); + req.setName(TEMPLATE_NAME); + List> orders = new ArrayList<>(); + orders.add(Arrays.asList(1L, 2L)); + orders.add(Arrays.asList(3L, 4L)); + req.setOrders(orders); + DatabaseChangeChangingOrderTemplateResp templateResp = templateService.create( + req); + int size = templateRepository.findAll().size(); + Assert.assertEquals(TEMPLATE_NAME, templateResp.getName()); + Assert.assertEquals(PROJECT_ID, templateResp.getProjectId()); + Assert.assertEquals(1, size); + } + + @Test(expected = BadRequestException.class) + public void createDatabaseChangingOrderTemplate_tempaltNameIsDuplicate_throwIllegalArgumentException() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + } + + + @Test(expected = NotFoundException.class) + public void createDatabaseChangingOrderTemplate_projectIsNotExist_throwIllegalArgumentException() { + CreateDatabaseChangeChangingOrderTemplateReq req = new CreateDatabaseChangeChangingOrderTemplateReq(); + req.setProjectId(PROJECT_ID); + req.setName(TEMPLATE_NAME); + List> orders = new ArrayList<>(); + List list = Arrays.asList(1L, 2L); + orders.add(list); + req.setOrders(orders); + when(projectRepository.existsById(any())).thenReturn(false); + templateService.create(req); + } + + @Test(expected = BadArgumentException.class) + public void createDatabaseChangingOrderTemplate_databaseNotBelongToProject_throwIllegalArgumentException() { + CreateDatabaseChangeChangingOrderTemplateReq req = new CreateDatabaseChangeChangingOrderTemplateReq(); + req.setProjectId(PROJECT_ID); + req.setName(TEMPLATE_NAME); + List> orders = new ArrayList<>(); + List list = Arrays.asList(1L, 2L); + orders.add(list); + req.setOrders(orders); + List databases = new ArrayList<>(); + DatabaseEntity database = new DatabaseEntity(); + database.setProjectId(2L); + databases.add(database); + when(projectRepository.existsById(any())).thenReturn(true); + when(databaseRepository.findByIdIn(any())).thenReturn(databases); + templateService.create(req); + } + + @Test + public void modifyDatabaseChangingOrderTemplate_modifyTemplate_succeed() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + DatabaseChangeChangingOrderTemplateEntity byNameAndProjectId = + templateRepository.findByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID).get(); + UpdateDatabaseChangeChangingOrderReq req = new UpdateDatabaseChangeChangingOrderReq(); + req.setProjectId(PROJECT_ID); + req.setName(TEMPLATE_RENAME); + List> orders = new ArrayList<>(); + orders.add(Arrays.asList(1L, 2L)); + orders.add(Arrays.asList(3L, 4L)); + DatabaseChangeChangingOrderTemplateResp update = templateService + .update(byNameAndProjectId.getId(), req); + assertEquals(TEMPLATE_RENAME, update.getName()); + Optional byId = + templateRepository.findById(byNameAndProjectId.getId()); + assertEquals(TEMPLATE_RENAME, byId.get().getName()); + + } + + @Test(expected = NotFoundException.class) + public void modifyDatabaseChangingOrderTemplate_notFoundTemplate_throwIllegalArgumentException() { + UpdateDatabaseChangeChangingOrderReq req = new UpdateDatabaseChangeChangingOrderReq(); + req.setProjectId(PROJECT_ID); + req.setName(TEMPLATE_RENAME); + List> orders = new ArrayList<>(); + List list = Arrays.asList(1L, 2L); + orders.add(list); + when(authenticationFacade.currentUserId()).thenReturn(1L); + when(authenticationFacade.currentOrganizationId()).thenReturn(1L); + templateService.update(1L, req); + } + + @Test + public void modifyDatabaseChangingOrderTemplate_projectNotExists_throwNotFoundException() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + DatabaseChangeChangingOrderTemplateEntity byNameAndProjectId = + templateRepository.findByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID).get(); + UpdateDatabaseChangeChangingOrderReq req = new UpdateDatabaseChangeChangingOrderReq(); + req.setName(TEMPLATE_RENAME); + req.setProjectId(2L); + when(projectRepository.existsById(2L)).thenReturn(false); + assertThrows(NotFoundException.class, () -> { + templateService.update(byNameAndProjectId.getId(), + req); + }); + } + + @Test + public void queryDatabaseChangingOrderTemplateById_findExistingTemplate_succeed() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + DatabaseChangeChangingOrderTemplateEntity byNameAndProjectId = + templateRepository.findByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID).get(); + DatabaseEntity database = new DatabaseEntity(); + database.setProjectId(PROJECT_ID); + when(databaseRepository.findByIdIn(anyList())).thenReturn(Arrays.asList(database)); + DatabaseChangeChangingOrderTemplateResp databaseChangeChangingOrderTemplateResp = + templateService.detail( + byNameAndProjectId.getId()); + assertEquals(PROJECT_ID, databaseChangeChangingOrderTemplateResp.getProjectId()); + assertEquals(TEMPLATE_NAME, databaseChangeChangingOrderTemplateResp.getName()); + assertEquals(PROJECT_ID, databaseChangeChangingOrderTemplateResp.getProjectId()); + + } + + @Test + public void listDatabaseChangingOrderTemplates_useQueryCondition_succeed() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + Pageable pageable = Pageable.unpaged(); + QueryDatabaseChangeChangingOrderParams params = QueryDatabaseChangeChangingOrderParams.builder() + .projectId(PROJECT_ID).creatorId(CURRENT_USER_ID).name(TEMPLATE_NAME).build(); + Page result = + templateService.listTemplates(pageable, params); + Assert.assertNotNull(result); + Assert.assertEquals(1, result.getContent().size()); + } + + + @Test + public void deleteDatabaseChangingOrderTemplateById_deleteExistingTemplate_succeed() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + DatabaseChangeChangingOrderTemplateEntity databaseChangeChangingOrderTemplateEntity = + templateRepository.findByNameAndProjectId(TEMPLATE_NAME, PROJECT_ID).get(); + DatabaseChangeChangingOrderTemplateResp delete = templateService.delete( + databaseChangeChangingOrderTemplateEntity.getId()); + int size = templateRepository.findAll().size(); + assertEquals(0, size); + } + + @Test + public void exists_checkTemplateExist_true() { + createDatabaseChangingOrderTemplate_saveEntity_succeed(); + DatabaseChangingOrderTemplateExists exists = templateService.exists(TEMPLATE_NAME, PROJECT_ID); + assertEquals(true, exists.getExists()); + } +} + + diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ResourceType.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ResourceType.java index 6178b731b5..51d5e8248e 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ResourceType.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/ResourceType.java @@ -93,6 +93,8 @@ public enum ResourceType implements Translatable { ODC_NOTIFICATION_POLICY, ODC_NOTIFICATION_MESSAGE, + ODC_DATABASE_CHANGE_ORDER_TEMPLATE, + /** * OB Resources, with 'OB_' prefix diff --git a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/TaskType.java b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/TaskType.java index d5b63b762f..4323eb6059 100644 --- a/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/TaskType.java +++ b/server/odc-core/src/main/java/com/oceanbase/odc/core/shared/constant/TaskType.java @@ -26,6 +26,10 @@ * @date 2021/3/15 */ public enum TaskType implements Translatable { + /** + * Multiple database change + */ + MULTIPLE_ASYNC, /** * Database change */ @@ -107,7 +111,8 @@ public String getLocalizedMessage() { } public boolean needsPreCheck() { - return this == ASYNC || this == ONLINE_SCHEMA_CHANGE || this == ALTER_SCHEDULE || this == EXPORT_RESULT_SET; + return this == ASYNC || this == ONLINE_SCHEMA_CHANGE || this == ALTER_SCHEDULE || this == EXPORT_RESULT_SET + || this == MULTIPLE_ASYNC; } public boolean needForExecutionStrategy() { diff --git a/server/odc-core/src/main/resources/i18n/BusinessMessages.properties b/server/odc-core/src/main/resources/i18n/BusinessMessages.properties index 006a7fa4dc..3d50b21784 100644 --- a/server/odc-core/src/main/resources/i18n/BusinessMessages.properties +++ b/server/odc-core/src/main/resources/i18n/BusinessMessages.properties @@ -80,6 +80,7 @@ com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_CHANNEL=Notification Channel com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_POLICY=Notification Policy com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_MESSAGE=Notification Message com.oceanbase.odc.ResourceType.ODC_STRUCTURE_COMPARISON_TASK=Structure Comparison Task +com.oceanbase.odc.ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE=Database change order template # # Batch Import @@ -308,6 +309,7 @@ com.oceanbase.odc.AuditEventType.RESOURCE_GROUP_MANAGEMENT=Resource group manage com.oceanbase.odc.AuditEventType.MEMBER_MANAGEMENT=Member management com.oceanbase.odc.AuditEventType.AUDIT_EVENT=Audit event com.oceanbase.odc.AuditEventType.FLOW_CONFIG=Flow config +com.oceanbase.odc.AuditEventType.MULTIPLE_ASYNC=Multiple Database Change com.oceanbase.odc.AuditEventType.ASYNC=Database Change com.oceanbase.odc.AuditEventType.MOCKDATA=Mock data com.oceanbase.odc.AuditEventType.IMPORT=Import @@ -793,6 +795,7 @@ com.oceanbase.odc.OBInstanceType.ORACLE_TENANT=Oracle Tenant # # TaskType # +com.oceanbase.odc.TaskType.MULTIPLE_ASYNC=Multiple Database Change com.oceanbase.odc.TaskType.ASYNC=Database Change com.oceanbase.odc.TaskType.IMPORT=Import com.oceanbase.odc.TaskType.EXPORT=Export diff --git a/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_CN.properties b/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_CN.properties index 90b9b0b0c6..856afe1ba8 100644 --- a/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_CN.properties +++ b/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_CN.properties @@ -80,6 +80,7 @@ com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_CHANNEL=通知通道 com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_POLICY=通知规则 com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_MESSAGE=通知消息 com.oceanbase.odc.ResourceType.ODC_STRUCTURE_COMPARISON_TASK=结构对比任务 +com.oceanbase.odc.ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE=数据库变更顺序模板 # # Batch Import @@ -310,6 +311,7 @@ com.oceanbase.odc.AuditEventType.RESOURCE_GROUP_MANAGEMENT=资源组管理 com.oceanbase.odc.AuditEventType.MEMBER_MANAGEMENT=人员管理 com.oceanbase.odc.AuditEventType.AUDIT_EVENT=审计事件 com.oceanbase.odc.AuditEventType.FLOW_CONFIG=流程配置 +com.oceanbase.odc.AuditEventType.MULTIPLE_ASYNC=多库变更 com.oceanbase.odc.AuditEventType.ASYNC=数据库变更 com.oceanbase.odc.AuditEventType.MOCKDATA=模拟数据 com.oceanbase.odc.AuditEventType.IMPORT=导入 @@ -715,8 +717,8 @@ com.oceanbase.odc.builtin-resource.regulation.risklevel.default-risk.name=默认 com.oceanbase.odc.builtin-resource.regulation.risklevel.default-risk.description=默认风险 com.oceanbase.odc.builtin-resource.regulation.risklevel.low-risk.name=低风险 com.oceanbase.odc.builtin-resource.regulation.risklevel.low-risk.description=低风险 -com.oceanbase.odc.builtin-resource.regulation.risklevel.mid-risk.name=中风险 -com.oceanbase.odc.builtin-resource.regulation.risklevel.mid-risk.description=中风险 +com.oceanbase.odc.builtin-resource.regulation.risklevel.moderate-risk.name=中风险 +com.oceanbase.odc.builtin-resource.regulation.risklevel.moderate-risk.description=中风险 com.oceanbase.odc.builtin-resource.regulation.risklevel.high-risk.name=高风险 com.oceanbase.odc.builtin-resource.regulation.risklevel.high-risk.description=高风险 # @@ -728,6 +730,7 @@ com.oceanbase.odc.OBInstanceType.ORACLE_TENANT=Oracle 租户实例 # # TaskType # +com.oceanbase.odc.TaskType.MULTIPLE_ASYNC=多库变更 com.oceanbase.odc.TaskType.ASYNC=数据库变更 com.oceanbase.odc.TaskType.IMPORT=导入 com.oceanbase.odc.TaskType.EXPORT=导出 diff --git a/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_TW.properties b/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_TW.properties index a10926cace..3dd687b893 100644 --- a/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_TW.properties +++ b/server/odc-core/src/main/resources/i18n/BusinessMessages_zh_TW.properties @@ -80,6 +80,7 @@ com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_CHANNEL=通知通道 com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_POLICY=通知規則 com.oceanbase.odc.ResourceType.ODC_NOTIFICATION_MESSAGE=通知消息 com.oceanbase.odc.ResourceType.ODC_STRUCTURE_COMPARISON_TASK=結構對比任務 +com.oceanbase.odc.ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE=數據庫變更順序模板 # # Batch Import @@ -311,6 +312,7 @@ com.oceanbase.odc.AuditEventType.RESOURCE_GROUP_MANAGEMENT=資源組管理 com.oceanbase.odc.AuditEventType.MEMBER_MANAGEMENT=人員管理 com.oceanbase.odc.AuditEventType.AUDIT_EVENT=審計事件 com.oceanbase.odc.AuditEventType.FLOW_CONFIG=流程配置 +com.oceanbase.odc.AuditEventType.MULTIPLE_ASYNC=多庫變更 com.oceanbase.odc.AuditEventType.ASYNC=數據庫變更 com.oceanbase.odc.AuditEventType.MOCKDATA=模擬數據 com.oceanbase.odc.AuditEventType.IMPORT=導入 @@ -781,8 +783,8 @@ com.oceanbase.odc.builtin-resource.regulation.risklevel.default-risk.name= 默 com.oceanbase.odc.builtin-resource.regulation.risklevel.default-risk.description=默認風險 com.oceanbase.odc.builtin-resource.regulation.risklevel.low-risk.name=低風險 com.oceanbase.odc.builtin-resource.regulation.risklevel.low-risk.description=低風險 -com.oceanbase.odc.builtin-resource.regulation.risklevel.mid-risk.name=中風險 -com.oceanbase.odc.builtin-resource.regulation.risklevel.mid-risk.description=中風險 +com.oceanbase.odc.builtin-resource.regulation.risklevel.moderate-risk.name=中風險 +com.oceanbase.odc.builtin-resource.regulation.risklevel.moderate-risk.description=中風險 com.oceanbase.odc.builtin-resource.regulation.risklevel.high-risk.name=高風險 com.oceanbase.odc.builtin-resource.regulation.risklevel.high-risk.description=高風險 @@ -796,6 +798,7 @@ com.oceanbase.odc.OBInstanceType.ORACLE_TENANT=Oracle 租戶實例 # # TaskType # +com.oceanbase.odc.TaskType.MULTIPLE_ASYNC=多庫變更 com.oceanbase.odc.TaskType.ASYNC=數據庫變更 com.oceanbase.odc.TaskType.IMPORT=導入 com.oceanbase.odc.TaskType.EXPORT=導出 diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_3__add_databasechange_changingorder_template.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_3__add_databasechange_changingorder_template.sql index a569a9d2bd..80b9313fce 100644 --- a/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_3__add_databasechange_changingorder_template.sql +++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_3_0_3__add_databasechange_changingorder_template.sql @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS `databasechange_changingorder_template`( `project_id` bigint(20) NOT NULL COMMENT 'Reference collaboration_project(id)', `organization_id` bigint(20) NOT NULL COMMENT 'Reference iam_user_organization(id)', `database_sequences` varchar(1024) NOT NULL COMMENT 'Database Execution sequence', + `is_enabled` tinyint(1) NOT NULL DEFAULT '1', CONSTRAINT `pk_databasechange_changingorder_template_id` PRIMARY KEY(`id`), UNIQUE KEY `uk_databasechange_changingorder_template_project_id_name` (`project_id`,`name`) ); \ No newline at end of file diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java index e633fb6b00..257684ad50 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/ConnectSessionController.java @@ -39,7 +39,6 @@ import org.springframework.web.multipart.MultipartFile; import com.oceanbase.odc.core.session.ConnectionSession; -import com.oceanbase.odc.core.shared.exception.NotImplementedException; import com.oceanbase.odc.service.common.response.ListResponse; import com.oceanbase.odc.service.common.response.Responses; import com.oceanbase.odc.service.common.response.SuccessResponse; @@ -65,6 +64,7 @@ import com.oceanbase.odc.service.sqlcheck.SqlCheckService; import com.oceanbase.odc.service.sqlcheck.model.CheckResult; import com.oceanbase.odc.service.sqlcheck.model.MultipleSqlCheckReq; +import com.oceanbase.odc.service.sqlcheck.model.MultipleSqlCheckResult; import com.oceanbase.odc.service.sqlcheck.model.SqlCheckReq; import com.oceanbase.odc.service.state.model.StateName; import com.oceanbase.odc.service.state.model.StatefulRoute; @@ -164,15 +164,15 @@ public ListResponse check(@PathVariable String sessionId, @RequestB } /** - * 对多个数据库进行sql检查 todo 待完善 - * + * check the contents of multiple sql scripts + * * @param req * @return */ @ApiOperation(value = "sqlCheck", notes = "statically check the contents of multiple sql scripts") - @PostMapping("sessions/sqlCheck") - public ListResponse multipleCheck(@RequestBody MultipleSqlCheckReq req) { - throw new NotImplementedException("Unsupported now"); + @PostMapping("/sessions/sqlCheck") + public ListResponse multipleCheck(@RequestBody MultipleSqlCheckReq req) { + return Responses.list(this.sqlCheckService.multipleCheck(req)); } /** diff --git a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/DatabaseChangeController.java b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/DatabaseChangeController.java index c1d41d24d0..f7c83fc37e 100644 --- a/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/DatabaseChangeController.java +++ b/server/odc-server/src/main/java/com/oceanbase/odc/server/web/controller/v2/DatabaseChangeController.java @@ -26,14 +26,19 @@ import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.oceanbase.odc.service.common.response.PaginatedResponse; import com.oceanbase.odc.service.common.response.Responses; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.databasechange.DatabaseChangeChangingOrderTemplateService; -import com.oceanbase.odc.service.databasechange.model.CreateDatabaseChangeChangingOrderReq; -import com.oceanbase.odc.service.databasechange.model.QueryDatabaseChangeChangingOrderResp; +import com.oceanbase.odc.service.databasechange.model.CreateDatabaseChangeChangingOrderTemplateReq; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeChangingOrderTemplateResp; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangingOrderTemplateExists; +import com.oceanbase.odc.service.databasechange.model.QueryDatabaseChangeChangingOrderParams; +import com.oceanbase.odc.service.databasechange.model.UpdateDatabaseChangeChangingOrderReq; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; @@ -47,46 +52,65 @@ @RequestMapping("/api/v2/databasechange") public class DatabaseChangeController { @Autowired - private DatabaseChangeChangingOrderTemplateService databaseChangeChangingOrderTemplateService; + private DatabaseChangeChangingOrderTemplateService templateService; @ApiOperation(value = "createDatabaseChangingOrderTemplate", notes = "create database changing order template") @PostMapping("/changingorder/templates") - public SuccessResponse createDatabaseChangingOrderTemplate( - @RequestBody CreateDatabaseChangeChangingOrderReq req) { - return Responses.success(databaseChangeChangingOrderTemplateService.createDatabaseChangingOrderTemplate(req)); + public SuccessResponse create( + @RequestBody CreateDatabaseChangeChangingOrderTemplateReq req) { + return Responses.success(templateService.create(req)); } - @ApiOperation(value = "modifyDatabaseChangingOrderTemplate/{id:[\\d]+}", + @ApiOperation(value = "modifyDatabaseChangingOrderTemplate", notes = "modify database changing order template") @PutMapping("/changingorder/templates/{id:[\\d]+}") - public SuccessResponse modifyDatabaseChangingOrderTemplate(@PathVariable Long id, - @RequestBody CreateDatabaseChangeChangingOrderReq req) { - return Responses.success(databaseChangeChangingOrderTemplateService.modifyDatabaseChangingOrderTemplate(req)); + public SuccessResponse update(@PathVariable Long id, + @RequestBody UpdateDatabaseChangeChangingOrderReq req) { + return Responses + .success(templateService.update(id, req)); } @ApiOperation(value = "queryDatabaseChangingOrderTemplateById", notes = "query database changing order template's detail by id") @GetMapping("/changingorder/templates/{id:[\\d]+}") - public SuccessResponse queryDatabaseChangingOrderTemplateById( + public SuccessResponse detail( @PathVariable Long id) { - return Responses.success(databaseChangeChangingOrderTemplateService.queryDatabaseChangingOrderTemplateById(id)); + return Responses.success(templateService.detail(id)); } @ApiOperation(value = "listDatabaseChangingOrderTemplates", notes = "get a list of database changing order templates") @GetMapping("/changingorder/templates") - public PaginatedResponse listDatabaseChangingOrderTemplates( - @PageableDefault(size = Integer.MAX_VALUE, sort = {"id"}, direction = Direction.DESC) Pageable pageable) { + public PaginatedResponse list( + @PageableDefault(size = Integer.MAX_VALUE, sort = {"id"}, direction = Direction.DESC) Pageable pageable, + @RequestParam(required = false, name = "name") String name, + @RequestParam(required = false, name = "creatorId") Long creatorId, + @RequestParam(required = true, name = "projectId") Long projectId) { + QueryDatabaseChangeChangingOrderParams queryDatabaseChangeChangingOrderParams = + QueryDatabaseChangeChangingOrderParams.builder() + .name(name) + .creatorId(creatorId) + .projectId(projectId) + .build(); return Responses - .paginated(databaseChangeChangingOrderTemplateService.listDatabaseChangingOrderTemplates(pageable)); + .paginated(templateService.listTemplates(pageable, + queryDatabaseChangeChangingOrderParams)); } @ApiOperation(value = "deleteDatabaseChangingOrderTemplateById", notes = "delete database changing order template by id") @DeleteMapping("/changingorder/templates/{id:[\\d]+}") - public SuccessResponse deleteDatabaseChangingOrderTemplateById(@PathVariable Long id) { + public SuccessResponse delete( + @PathVariable Long id) { return Responses - .success(databaseChangeChangingOrderTemplateService.deleteDatabaseChangingOrderTemplateById(id)); + .success(templateService.delete(id)); + } + + @ApiOperation(value = "exists", notes = "Returns whether an database changing order template exists") + @RequestMapping(value = "/changingorder/templates/exists", method = RequestMethod.GET) + public SuccessResponse exists(@RequestParam String name, + @RequestParam Long projectId) { + return Responses.success(templateService.exists(name, projectId)); } } diff --git a/server/odc-server/src/main/resources/log4j2.xml b/server/odc-server/src/main/resources/log4j2.xml index e26694a248..ad4a87e541 100644 --- a/server/odc-server/src/main/resources/log4j2.xml +++ b/server/odc-server/src/main/resources/log4j2.xml @@ -643,7 +643,54 @@ + + + + + + + %d{yyyy-MM-dd HH:mm:ss} %p %c{1.} - %m%n + + + + + + + + + + + + + + + + + + + %d{yyyy-MM-dd HH:mm:ss} %p %c{1.} - %m%n + + + + + + + + + + + + + + + + + @@ -760,7 +807,11 @@ - + + + + + diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/connection/DatabaseRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/connection/DatabaseRepository.java index 1dbccf1943..e4b09a1127 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/connection/DatabaseRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/connection/DatabaseRepository.java @@ -42,6 +42,8 @@ public interface DatabaseRepository extends JpaRepository, List findByProjectId(Long projectId); + List findByProjectIdIn(List projectIds); + List findByProjectIdAndExisted(Long projectId, Boolean existed); List findByIdIn(Collection ids); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateEntity.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateEntity.java index a75a88ce79..48468598f4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateEntity.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateEntity.java @@ -19,7 +19,6 @@ import java.util.List; import javax.persistence.Column; -import javax.persistence.Convert; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; @@ -28,13 +27,17 @@ import org.hibernate.annotations.Generated; import org.hibernate.annotations.GenerationTime; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; +import org.hibernate.annotations.TypeDefs; -import com.oceanbase.odc.common.jpa.JsonListConverter; +import com.oceanbase.odc.config.jpa.type.JsonType; import lombok.Data; @Data @Entity +@TypeDefs({@TypeDef(name = "Json", typeClass = JsonType.class)}) @Table(name = "databasechange_changingorder_template") public class DatabaseChangeChangingOrderTemplateEntity { @@ -55,10 +58,13 @@ public class DatabaseChangeChangingOrderTemplateEntity { @Column(name = "project_id", nullable = false) private Long projectId; + @Type(type = "Json") @Column(name = "database_sequences", nullable = false) - @Convert(converter = JsonListConverter.class) private List> databaseSequences; + @Column(name = "is_enabled", nullable = false) + private Boolean enabled; + @Generated(GenerationTime.ALWAYS) @Column(name = "create_time", insertable = false, updatable = false) private Date createTime; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepository.java index 5a548aea9f..c41c4ee8c8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateRepository.java @@ -15,13 +15,30 @@ */ package com.oceanbase.odc.metadb.databasechange; +import java.util.List; +import java.util.Optional; + import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Modifying; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; @Repository public interface DatabaseChangeChangingOrderTemplateRepository extends JpaRepository, JpaSpecificationExecutor { + Boolean existsByNameAndProjectId(String name, Long projectId); + + Optional findByNameAndProjectId(String name, Long projectId); + + Optional findByIdAndProjectId(Long id, Long projectId); + + @Transactional + @Modifying + @Query("update DatabaseChangeChangingOrderTemplateEntity as t set t.enabled = false where t.id in :ids") + int updateEnabledByIds(@Param("ids") List ids); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateSpecs.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateSpecs.java new file mode 100644 index 0000000000..b8e5c9a16d --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/databasechange/DatabaseChangeChangingOrderTemplateSpecs.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.metadb.databasechange; + +import java.util.Collection; + +import org.springframework.data.jpa.domain.Specification; + +import com.oceanbase.odc.common.jpa.SpecificationUtil; + +/** + * @author: zijia.cj + * @date: 2024/4/22 + */ +public class DatabaseChangeChangingOrderTemplateSpecs { + + public static Specification idEquals(Long id) { + return SpecificationUtil.columnEqual(DatabaseChangeChangingOrderTemplateEntity_.ID, id); + } + + public static Specification nameLikes(String name) { + return SpecificationUtil.columnLike(DatabaseChangeChangingOrderTemplateEntity_.NAME, name); + } + + public static Specification creatorIdIn(Collection userIds) { + return SpecificationUtil.columnIn(DatabaseChangeChangingOrderTemplateEntity_.CREATOR_ID, userIds); + } + + public static Specification projectIdEquals(Long projectId) { + return SpecificationUtil.columnEqual(DatabaseChangeChangingOrderTemplateEntity_.PROJECT_ID, projectId); + } + + public static Specification enabledEquals(Boolean enabled) { + return SpecificationUtil.columnEqual(DatabaseChangeChangingOrderTemplateEntity_.ENABLED, enabled); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateSchedules.java new file mode 100644 index 0000000000..67d318f334 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateSchedules.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageRequest; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.domain.Specification; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateEntity; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateRepository; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateSpecs; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author: zijia.cj + * @date: 2024/5/16 + */ +@Slf4j +@Component +public class DatabaseChangeChangingOrderTemplateSchedules { + + private static final int PAGE_SIZE = 100; + @Autowired + private DatabaseChangeChangingOrderTemplateService templateService; + @Autowired + private DatabaseChangeChangingOrderTemplateRepository templateRepository; + + @Scheduled(fixedDelayString = "${odc.task.databasechange.update-enable-interval-millis:180000}") + public void syncTemplates() { + int page = 0; + Pageable pageable; + Page pageResult; + do { + pageable = PageRequest.of(page, PAGE_SIZE); + Specification specification = + Specification.where(DatabaseChangeChangingOrderTemplateSpecs.enabledEquals(true)); + pageResult = this.templateRepository.findAll(specification, pageable); + Map templateId2Status = this.templateService + .getChangingOrderTemplateId2EnableStatus(pageResult.getContent().stream() + .map(DatabaseChangeChangingOrderTemplateEntity::getId).collect(Collectors.toSet())); + List disabledTemplateIds = templateId2Status.entrySet().stream() + .filter(e -> Boolean.FALSE.equals(e.getValue())) + .map(Entry::getKey).collect(Collectors.toList()); + templateRepository.updateEnabledByIds(disabledTemplateIds); + page++; + } while (pageResult.hasNext()); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateService.java index 13366d3535..778666dacf 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/DatabaseChangeChangingOrderTemplateService.java @@ -15,52 +15,284 @@ */ package com.oceanbase.odc.service.databasechange; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + import javax.validation.Valid; -import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.apache.commons.collections4.CollectionUtils; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; +import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; -import org.springframework.data.domain.Sort.Direction; -import org.springframework.data.web.PageableDefault; +import org.springframework.data.jpa.domain.Specification; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; -import com.oceanbase.odc.core.shared.exception.NotImplementedException; -import com.oceanbase.odc.service.databasechange.model.CreateDatabaseChangeChangingOrderReq; -import com.oceanbase.odc.service.databasechange.model.QueryDatabaseChangeChangingOrderResp; +import com.oceanbase.odc.core.authority.util.SkipAuthorize; +import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.ErrorCodes; +import com.oceanbase.odc.core.shared.constant.ResourceRoleName; +import com.oceanbase.odc.core.shared.constant.ResourceType; +import com.oceanbase.odc.core.shared.exception.BadArgumentException; +import com.oceanbase.odc.core.shared.exception.NotFoundException; +import com.oceanbase.odc.metadb.collaboration.ProjectEntity; +import com.oceanbase.odc.metadb.collaboration.ProjectRepository; +import com.oceanbase.odc.metadb.connection.DatabaseEntity; +import com.oceanbase.odc.metadb.connection.DatabaseRepository; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateEntity; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateRepository; +import com.oceanbase.odc.metadb.databasechange.DatabaseChangeChangingOrderTemplateSpecs; +import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.databasechange.model.CreateDatabaseChangeChangingOrderTemplateReq; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeChangingOrderTemplateResp; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeProperties; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangingOrderTemplateExists; +import com.oceanbase.odc.service.databasechange.model.QueryDatabaseChangeChangingOrderParams; +import com.oceanbase.odc.service.databasechange.model.UpdateDatabaseChangeChangingOrderReq; +import com.oceanbase.odc.service.iam.ProjectPermissionValidator; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; @Service @Validated +@SkipAuthorize("permission check inside") public class DatabaseChangeChangingOrderTemplateService { + @Autowired + private DatabaseChangeChangingOrderTemplateRepository templateRepository; + @Autowired + private AuthenticationFacade authenticationFacade; + + @Autowired + private DatabaseRepository databaseRepository; + + @Autowired + private ProjectRepository projectRepository; + @Autowired + private DatabaseService databaseService; + + @Autowired + private ProjectPermissionValidator projectPermissionValidator; + @Autowired + private DatabaseChangeProperties databaseChangeProperties; + + @SkipAuthorize("internal usage") + public Map getChangingOrderTemplateId2EnableStatus(Set templateIds) { + List templates = + this.templateRepository.findAllById(templateIds); + Map> projectId2TemplateEntityList = templates.stream() + .collect(Collectors.groupingBy(DatabaseChangeChangingOrderTemplateEntity::getProjectId)); + List projectEntities = projectRepository.findByIdIn(projectId2TemplateEntityList.keySet()); + List archivedProjectIds = projectEntities.stream() + .filter(p -> Boolean.TRUE.equals(p.getArchived())) + .map(ProjectEntity::getId).collect(Collectors.toList()); + List disabledTemplateIds = projectId2TemplateEntityList.entrySet().stream() + .filter(entry -> archivedProjectIds.contains(entry.getKey())) + .flatMap(entry -> entry.getValue().stream() + .map(DatabaseChangeChangingOrderTemplateEntity::getId)) + .collect(Collectors.toList()); + + List nonArchivedProjectIds = projectEntities.stream() + .filter(p -> Boolean.FALSE.equals(p.getArchived())) + .map(ProjectEntity::getId).collect(Collectors.toList()); - @Transactional - public Boolean createDatabaseChangingOrderTemplate( - @NotNull @Valid CreateDatabaseChangeChangingOrderReq req) { - throw new NotImplementedException("Unsupported now"); + Map> projectId2Databases = this.databaseRepository + .findByProjectIdIn(nonArchivedProjectIds).stream() + .collect(Collectors.groupingBy(DatabaseEntity::getProjectId)); + disabledTemplateIds.addAll(projectId2TemplateEntityList.entrySet().stream() + // 留下未归档的projectId2TemplateEntityList + .filter(entry -> nonArchivedProjectIds.contains(entry.getKey())) + .flatMap(entry -> { + List databases = projectId2Databases.get(entry.getKey()); + if (CollectionUtils.isEmpty(databases)) { + return entry.getValue().stream().map(DatabaseChangeChangingOrderTemplateEntity::getId); + } + Set dbIds = databases.stream().map(DatabaseEntity::getId).collect(Collectors.toSet()); + return entry.getValue().stream().filter(en -> { + Set templateDbIds = en.getDatabaseSequences().stream() + .flatMap(Collection::stream).collect(Collectors.toSet()); + return !CollectionUtils.containsAll(dbIds, templateDbIds); + }).map(DatabaseChangeChangingOrderTemplateEntity::getId); + }).collect(Collectors.toList())); + return templateIds.stream().collect(Collectors.toMap(id -> id, id -> !disabledTemplateIds.contains(id))); } - @Transactional - public Boolean modifyDatabaseChangingOrderTemplate( - @NotNull @Valid CreateDatabaseChangeChangingOrderReq req) { - throw new NotImplementedException("Unsupported now"); + @Transactional(rollbackFor = Exception.class) + public DatabaseChangeChangingOrderTemplateResp create( + @NotNull @Valid CreateDatabaseChangeChangingOrderTemplateReq req) { + PreConditions.validExists(ResourceType.ODC_PROJECT, "projectId", req.getProjectId(), + () -> projectRepository.existsById(req.getProjectId())); + projectPermissionValidator.checkProjectRole(req.getProjectId(), ResourceRoleName.all()); + List> orders = req.getOrders(); + List databaseIds = orders.stream().flatMap(List::stream).collect(Collectors.toList()); + validateSizeAndNotDuplicated(databaseIds); + PreConditions.validNoDuplicated(ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE, "name", req.getName(), + () -> templateRepository.existsByNameAndProjectId(req.getName(), req.getProjectId())); + List byIdIn = databaseRepository.findByIdIn(databaseIds); + if (!(byIdIn.stream().allMatch(x -> x.getProjectId().equals(req.getProjectId())))) { + throw new BadArgumentException(ErrorCodes.BadArgument, + "all databases must belong to the current project"); + } + long userId = authenticationFacade.currentUserId(); + Long organizationId = authenticationFacade.currentOrganizationId(); + DatabaseChangeChangingOrderTemplateEntity templateEntity = + new DatabaseChangeChangingOrderTemplateEntity(); + templateEntity.setName(req.getName()); + templateEntity.setCreatorId(userId); + templateEntity.setProjectId(req.getProjectId()); + templateEntity.setOrganizationId(organizationId); + templateEntity.setDatabaseSequences(req.getOrders()); + templateEntity.setEnabled(true); + DatabaseChangeChangingOrderTemplateEntity savedEntity = templateRepository.save(templateEntity); + DatabaseChangeChangingOrderTemplateResp templateResp = new DatabaseChangeChangingOrderTemplateResp(); + templateResp.setId(savedEntity.getId()); + templateResp.setName(savedEntity.getName()); + templateResp.setCreatorId(savedEntity.getCreatorId()); + templateResp.setProjectId(savedEntity.getProjectId()); + templateResp.setOrganizationId(savedEntity.getOrganizationId()); + List> databaseSequences = savedEntity.getDatabaseSequences(); + List> databaseSequenceList = databaseSequences.stream() + .map(s -> s.stream().map(DatabaseChangeDatabase::new).collect(Collectors.toList())) + .collect(Collectors.toList()); + templateResp.setDatabaseSequenceList(databaseSequenceList); + templateResp.setEnabled(true); + return templateResp; } - public QueryDatabaseChangeChangingOrderResp queryDatabaseChangingOrderTemplateById( - @NotNull @Min(value = 0) Long id) { - throw new NotImplementedException("Unsupported now"); + public void validateSizeAndNotDuplicated(List databaseIds) { + if (databaseIds.size() <= databaseChangeProperties.getMinDatabaseCount() + || databaseIds.size() > databaseChangeProperties.getMaxDatabaseCount()) { + throw new BadArgumentException(ErrorCodes.BadArgument, + String.format("The number of databases must be greater than %s and not more than %s.", + databaseChangeProperties.getMinDatabaseCount(), + databaseChangeProperties.getMaxDatabaseCount())); + } + if (new HashSet(databaseIds).size() != databaseIds.size()) { + throw new BadArgumentException(ErrorCodes.BadArgument, + "Databases cannot be duplicated."); + } } + @Transactional(rollbackFor = Exception.class) + public DatabaseChangeChangingOrderTemplateResp update(Long id, + @NotNull @Valid UpdateDatabaseChangeChangingOrderReq req) { + PreConditions.validExists(ResourceType.ODC_PROJECT, "projectId", req.getProjectId(), + () -> projectRepository.existsById(req.getProjectId())); + projectPermissionValidator.checkProjectRole(req.getProjectId(), ResourceRoleName.all()); + DatabaseChangeChangingOrderTemplateEntity originEntity = + templateRepository.findByIdAndProjectId(id, req.getProjectId()).orElseThrow( + () -> new NotFoundException(ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE, "id", id)); + PreConditions.validNoDuplicated(ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE, "name", req.getName(), + () -> templateRepository.existsByNameAndProjectId(req.getName(), req.getProjectId())); + originEntity.setName(req.getName()); + DatabaseChangeChangingOrderTemplateEntity savedEntity = templateRepository.save(originEntity); + DatabaseChangeChangingOrderTemplateResp templateResp = new DatabaseChangeChangingOrderTemplateResp(); + templateResp.setId(savedEntity.getId()); + templateResp.setName(savedEntity.getName()); + templateResp.setCreatorId(savedEntity.getCreatorId()); + templateResp.setProjectId(savedEntity.getProjectId()); + templateResp.setOrganizationId(savedEntity.getOrganizationId()); + List> databaseSequences = savedEntity.getDatabaseSequences(); + List> databaseSequenceList = databaseSequences.stream() + .map(s -> s.stream().map(DatabaseChangeDatabase::new).collect(Collectors.toList())) + .collect(Collectors.toList()); + templateResp.setDatabaseSequenceList(databaseSequenceList); + templateResp.setEnabled(true); + return templateResp; + } + + public DatabaseChangeChangingOrderTemplateResp detail(@NotNull Long id) { + DatabaseChangeChangingOrderTemplateEntity templateEntity = + templateRepository.findById(id).orElseThrow( + () -> new NotFoundException(ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE, "id", id)); + projectPermissionValidator.checkProjectRole(templateEntity.getProjectId(), + ResourceRoleName.all()); + List> databaseSequences = templateEntity.getDatabaseSequences(); + DatabaseChangeChangingOrderTemplateResp templateResp = + new DatabaseChangeChangingOrderTemplateResp(); + templateResp.setId(templateEntity.getId()); + templateResp.setName(templateEntity.getName()); + templateResp + .setCreatorId(templateEntity.getCreatorId()); + templateResp.setProjectId(templateEntity.getProjectId()); + templateResp + .setOrganizationId(templateEntity.getOrganizationId()); + List> databaseSequenceList = databaseSequences.stream() + .map(s -> s.stream().map(DatabaseChangeDatabase::new).collect(Collectors.toList())) + .collect(Collectors.toList()); + templateResp.setDatabaseSequenceList(databaseSequenceList); + Map templateId2Status = getChangingOrderTemplateId2EnableStatus( + Collections.singleton(templateEntity.getId())); + templateResp.setEnabled(templateId2Status.getOrDefault(templateEntity.getId(), templateEntity.getEnabled())); + return templateResp; + } + + + public Page listTemplates(@NotNull Pageable pageable, + @NotNull @Valid QueryDatabaseChangeChangingOrderParams params) { + projectPermissionValidator.checkProjectRole(params.getProjectId(), ResourceRoleName.all()); + Specification specification = Specification + .where(DatabaseChangeChangingOrderTemplateSpecs.projectIdEquals(params.getProjectId())) + .and(params.getName() == null ? null + : DatabaseChangeChangingOrderTemplateSpecs.nameLikes(params.getName())) + .and(params.getCreatorId() == null ? null + : DatabaseChangeChangingOrderTemplateSpecs + .creatorIdIn(Collections.singleton(params.getCreatorId()))); + Page pageResult = + templateRepository.findAll(specification, pageable); + List entityList = pageResult.getContent(); + List templateRespList = entityList.stream().map(entity -> { + DatabaseChangeChangingOrderTemplateResp templateResp = new DatabaseChangeChangingOrderTemplateResp(); + templateResp.setId(entity.getId()); + templateResp.setName(entity.getName()); + templateResp.setCreatorId(entity.getCreatorId()); + templateResp.setProjectId(entity.getProjectId()); + templateResp.setOrganizationId(entity.getOrganizationId()); + templateResp.setEnabled(entity.getEnabled()); + return templateResp; + }).collect(Collectors.toList()); + return new PageImpl<>(templateRespList, pageable, pageResult.getTotalElements()); + } - public Page listDatabaseChangingOrderTemplates( - @PageableDefault(size = Integer.MAX_VALUE, sort = {"id"}, direction = Direction.DESC) Pageable pageable) { - throw new NotImplementedException("Unsupported now"); + @Transactional(rollbackFor = Exception.class) + public DatabaseChangeChangingOrderTemplateResp delete(@NotNull Long id) { + DatabaseChangeChangingOrderTemplateEntity templateEntity = + templateRepository.findById(id).orElseThrow( + () -> new NotFoundException(ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE, "id", id)); + projectPermissionValidator.checkProjectRole(templateEntity.getProjectId(), + ResourceRoleName.all()); + templateRepository.deleteById(id); + DatabaseChangeChangingOrderTemplateResp templateResp = new DatabaseChangeChangingOrderTemplateResp(); + templateResp.setId(templateEntity.getId()); + templateResp.setName(templateEntity.getName()); + templateResp.setCreatorId(templateEntity.getCreatorId()); + templateResp.setProjectId(templateEntity.getProjectId()); + templateResp.setOrganizationId(templateEntity.getOrganizationId()); + List> databaseSequences = templateEntity.getDatabaseSequences(); + List> databaseSequenceList = databaseSequences.stream() + .map(s -> s.stream().map(DatabaseChangeDatabase::new).collect(Collectors.toList())) + .collect(Collectors.toList()); + templateResp.setDatabaseSequenceList(databaseSequenceList); + templateResp.setEnabled(true); + return templateResp; } - @Transactional - public Boolean deleteDatabaseChangingOrderTemplateById(@NotNull @Min(value = 0) Long id) { - throw new NotImplementedException("Unsupported now"); + public DatabaseChangingOrderTemplateExists exists(String name, Long projectId) { + if (templateRepository.existsByNameAndProjectId(name, projectId)) { + return DatabaseChangingOrderTemplateExists + .builder().exists(true).errorMessage(ErrorCodes.DuplicatedExists.getLocalizedMessage( + new Object[] {ResourceType.ODC_DATABASE_CHANGE_ORDER_TEMPLATE.getLocalizedMessage(), "name", + name})) + .build(); + } + return DatabaseChangingOrderTemplateExists.builder().exists(false).build(); } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangePreprocessor.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangePreprocessor.java new file mode 100644 index 0000000000..898720730b --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangePreprocessor.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.ErrorCodes; +import com.oceanbase.odc.core.shared.constant.TaskType; +import com.oceanbase.odc.core.shared.exception.BadArgumentException; +import com.oceanbase.odc.service.collaboration.project.ProjectService; +import com.oceanbase.odc.service.collaboration.project.model.Project; +import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.database.model.Database; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeProject; +import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; +import com.oceanbase.odc.service.flow.processor.FlowTaskPreprocessor; +import com.oceanbase.odc.service.flow.processor.Preprocessor; +import com.oceanbase.odc.service.flow.task.model.FlowTaskProperties; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; +import com.oceanbase.odc.service.flow.util.DescriptionGenerator; +import com.oceanbase.odc.service.iam.ProjectPermissionValidator; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author: zijia.cj + * @date: 2024/5/11 + */ +@Slf4j +@FlowTaskPreprocessor(type = TaskType.MULTIPLE_ASYNC) +public class MultipleDatabaseChangePreprocessor implements Preprocessor { + @Autowired + private DatabaseService databaseService; + @Autowired + private ProjectService projectService; + @Autowired + private ProjectPermissionValidator projectPermissionValidator; + @Autowired + private DatabaseChangeChangingOrderTemplateService templateService; + @Autowired + private FlowTaskProperties flowTaskProperties; + + @Override + public void process(CreateFlowInstanceReq req) { + MultipleDatabaseChangeParameters parameters = (MultipleDatabaseChangeParameters) req.getParameters(); + Project project = projectService.detail(parameters.getProjectId()); + // Check the project permission + List ids = parameters.getOrderedDatabaseIds().stream().flatMap(List::stream).collect(Collectors.toList()); + templateService.validateSizeAndNotDuplicated(ids); + List databases = databaseService.listDatabasesDetailsByIds(ids); + // All databases must belong to the project + if (!databases.stream() + .allMatch(databaseEntity -> databaseEntity.getProject() != null && Objects.equals( + databaseEntity.getProject().getId(), parameters.getProjectId()))) { + throw new BadArgumentException(ErrorCodes.BadArgument, + String.format("All databases must belong to the same project: %s", project.getName())); + } + PreConditions.maxLength(parameters.getSqlContent(), "sql content", + flowTaskProperties.getSqlContentMaxLength()); + // must reset the batchid when initiating a multiple database flow again + parameters.setBatchId(null); + parameters.setProject(new DatabaseChangeProject(project)); + parameters.setDatabases(databases.stream().map(DatabaseChangeDatabase::new).collect(Collectors.toList())); + req.setProjectId(parameters.getProjectId()); + req.setProjectName(project.getName()); + DescriptionGenerator.generateDescription(req); + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangeTraceContextHolder.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangeTraceContextHolder.java new file mode 100644 index 0000000000..6e5aad6019 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/MultipleDatabaseChangeTraceContextHolder.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange; + +import org.slf4j.MDC; + +/** + * @author: zijia.cj + * @date: 2024/5/6 + */ +public final class MultipleDatabaseChangeTraceContextHolder { + public static final String TASK_ID = "taskId"; + + private MultipleDatabaseChangeTraceContextHolder() {} + + public static void trace(long taskId) { + MDC.put(TASK_ID, String.valueOf(taskId)); + } + + public static void clear() { + MDC.remove(TASK_ID); + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderTemplateReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderTemplateReq.java new file mode 100644 index 0000000000..614700de56 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderTemplateReq.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import java.util.List; + +import javax.validation.Valid; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import com.oceanbase.odc.common.validate.Name; + +import lombok.Data; + +@Data +public class CreateDatabaseChangeChangingOrderTemplateReq { + + @NotBlank + @Size(max = 256, message = "name is out of range [0, 256]") + @Name(message = "Template name cannot start or end with whitespaces") + private String name; + + @NotNull + private Long projectId; + + @NotEmpty + private List<@Valid @NotEmpty List<@NotNull Long>> orders; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeChangingOrderTemplateResp.java similarity index 83% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderResp.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeChangingOrderTemplateResp.java index 4b4e20e6fe..2173618aca 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderResp.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeChangingOrderTemplateResp.java @@ -17,12 +17,10 @@ import java.util.List; -import com.oceanbase.odc.metadb.connection.DatabaseEntity; - import lombok.Data; @Data -public class QueryDatabaseChangeChangingOrderResp { +public class DatabaseChangeChangingOrderTemplateResp { private Long id; @@ -34,6 +32,8 @@ public class QueryDatabaseChangeChangingOrderResp { private Long organizationId; - private List> databaseSequenceList; + private List> databaseSequenceList; + + private Boolean enabled; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeConnection.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeConnection.java new file mode 100644 index 0000000000..4ed2a29d59 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeConnection.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import java.io.Serializable; + +import com.oceanbase.odc.core.shared.constant.ConnectType; +import com.oceanbase.odc.service.connection.model.ConnectionConfig; +import com.oceanbase.odc.service.connection.model.OBInstanceType; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DatabaseChangeConnection implements Serializable { + + private static final long serialVersionUID = -5013749083390365604L; + private Long id; + private String name; + private String tenantName; + private String clusterName; + private ConnectType type; + private OBInstanceType instanceType; + + public DatabaseChangeConnection(ConnectionConfig connectionConfig) { + if (connectionConfig != null) { + this.id = connectionConfig.getId(); + this.name = connectionConfig.getName(); + this.type = connectionConfig.getType(); + this.instanceType = connectionConfig.getInstanceType(); + this.tenantName = connectionConfig.getTenantName(); + this.clusterName = connectionConfig.getClusterName(); + } + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeDatabase.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeDatabase.java new file mode 100644 index 0000000000..e353911c20 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeDatabase.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import com.oceanbase.odc.service.connection.database.model.Database; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.NonNull; + +@Data +@NoArgsConstructor +public class DatabaseChangeDatabase { + + private static final long serialVersionUID = -5013749085190365604L; + private Long id; + private String databaseId; + private Boolean existed; + private String name; + private DatabaseChangeProject project; + private DatabaseChangeConnection dataSource; + private DatabaseChangeEnvironment environment; + + public DatabaseChangeDatabase(@NonNull Database database) { + this.id = database.getId(); + this.databaseId = database.getDatabaseId(); + this.existed = database.getExisted(); + this.name = database.getName(); + this.project = new DatabaseChangeProject(database.getProject()); + this.dataSource = new DatabaseChangeConnection(database.getDataSource()); + this.environment = new DatabaseChangeEnvironment(database.getEnvironment()); + } + + public DatabaseChangeDatabase(@NonNull Long id) { + this.id = id; + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeEnvironment.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeEnvironment.java new file mode 100644 index 0000000000..4f49e0943b --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeEnvironment.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import java.io.Serializable; + +import com.oceanbase.odc.common.i18n.Internationalizable; +import com.oceanbase.odc.service.collaboration.environment.model.Environment; +import com.oceanbase.odc.service.collaboration.environment.model.EnvironmentStyle; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DatabaseChangeEnvironment implements Serializable { + + private static final long serialVersionUID = -5013749085190376604L; + private Long id; + @Internationalizable + private String name; + private EnvironmentStyle style; + + public DatabaseChangeEnvironment(Environment environment) { + if (environment != null) { + this.id = environment.getId(); + this.name = environment.getName(); + this.style = environment.getStyle(); + } + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeFlowInstanceDetailResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeFlowInstanceDetailResp.java new file mode 100644 index 0000000000..eb083f1051 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeFlowInstanceDetailResp.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import java.util.Date; + +import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author: zijia.cj + * @date: 2024/5/20 + */ +@Data +@NoArgsConstructor +public class DatabaseChangeFlowInstanceDetailResp { + private Long id; + private Date createTime; + private Date executionTime; + private Date completeTime; + + public DatabaseChangeFlowInstanceDetailResp(FlowInstanceDetailResp flowInstanceDetailResp) { + if (flowInstanceDetailResp != null) { + this.id = flowInstanceDetailResp.getId(); + this.createTime = flowInstanceDetailResp.getCreateTime(); + this.executionTime = flowInstanceDetailResp.getExecutionTime(); + this.completeTime = flowInstanceDetailResp.getCompleteTime(); + } + } +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProject.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProject.java new file mode 100644 index 0000000000..b876944c0b --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProject.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import java.io.Serializable; + +import com.oceanbase.odc.common.i18n.Internationalizable; +import com.oceanbase.odc.service.collaboration.project.model.Project; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DatabaseChangeProject implements Serializable { + + private static final long serialVersionUID = -5013749085190365604L; + private Long id; + @Internationalizable + private String name; + + public DatabaseChangeProject(Project project) { + if (project != null) { + this.id = project.getId(); + this.name = project.getName(); + } + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProperties.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProperties.java new file mode 100644 index 0000000000..bf24f417df --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangeProperties.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.context.annotation.Configuration; + +import lombok.Data; + +/** + * @author: zijia.cj + * @date: 2024/5/8 + */ +@Data +@RefreshScope +@Configuration +public class DatabaseChangeProperties { + + @Value("${odc.task.databasechange.min-database-count:1}") + private int minDatabaseCount = 1; + @Value("${odc.task.databasechange.min-database-count:100}") + private int maxDatabaseCount = 100; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingOrderTemplateExists.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingOrderTemplateExists.java new file mode 100644 index 0000000000..1b937069e8 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingOrderTemplateExists.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author: zijia.cj + * @date: 2024/4/24 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabaseChangingOrderTemplateExists { + private Boolean exists; + private String errorMessage; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingRecord.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingRecord.java new file mode 100644 index 0000000000..b6dedc0ba2 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/DatabaseChangingRecord.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import com.oceanbase.odc.core.shared.constant.FlowStatus; + +import lombok.Data; + +/** + * @author: zijia.cj + * @date: 2024/5/6 + */ +@Data +public class DatabaseChangingRecord { + private DatabaseChangeDatabase database; + private DatabaseChangeFlowInstanceDetailResp flowInstanceDetailResp; + private FlowStatus status; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderParams.java similarity index 74% rename from server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderReq.java rename to server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderParams.java index 1f617643a0..b430ad6f82 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/CreateDatabaseChangeChangingOrderReq.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/QueryDatabaseChangeChangingOrderParams.java @@ -15,24 +15,25 @@ */ package com.oceanbase.odc.service.databasechange.model; -import java.util.List; - -import javax.validation.constraints.NotBlank; -import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import lombok.Builder; import lombok.Data; +/** + * @author: zijia.cj + * @date: 2024/4/22 + */ @Data -public class CreateDatabaseChangeChangingOrderReq { +@Builder +public class QueryDatabaseChangeChangingOrderParams { - @NotBlank private String name; - @NotNull + @Min(value = 1, message = "projectId can not be smaller than 1") private Long projectId; - @NotEmpty - private List> orders; + private Long creatorId; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/UpdateDatabaseChangeChangingOrderReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/UpdateDatabaseChangeChangingOrderReq.java new file mode 100644 index 0000000000..a710900072 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/databasechange/model/UpdateDatabaseChangeChangingOrderReq.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.databasechange.model; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import com.oceanbase.odc.common.validate.Name; + +import lombok.Data; + +/** + * @author: zijia.cj + * @date: 2024/5/14 + */ +@Data +public class UpdateDatabaseChangeChangingOrderReq { + @NotBlank + @Size(max = 256, message = "name is out of range [0, 256]") + @Name(message = "Template name cannot start or end with whitespaces") + private String name; + + @NotNull + private Long projectId; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java index 12a6d1b8e9..4de420f38d 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java @@ -93,6 +93,7 @@ import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig; +import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; import com.oceanbase.odc.service.common.response.SuccessResponse; import com.oceanbase.odc.service.common.util.SqlUtils; import com.oceanbase.odc.service.config.SystemConfigService; @@ -104,6 +105,8 @@ import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.connection.model.OBTenant; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeConnection; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; import com.oceanbase.odc.service.dispatch.DispatchResponse; import com.oceanbase.odc.service.dispatch.RequestDispatcher; import com.oceanbase.odc.service.dispatch.TaskDispatchChecker; @@ -132,6 +135,7 @@ import com.oceanbase.odc.service.flow.task.model.DBStructureComparisonParameter; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.FlowTaskProperties; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; import com.oceanbase.odc.service.flow.task.model.ShadowTableSyncTaskParameter; import com.oceanbase.odc.service.flow.util.FlowTaskUtil; @@ -251,6 +255,8 @@ public class FlowInstanceService { private CloudMetadataClient cloudMetadataClient; @Autowired private EnvironmentRepository environmentRepository; + @Autowired + private EnvironmentService environmentService; private final List> dataTransferTaskInitHooks = new ArrayList<>(); private final List> shadowTableComparingTaskHooks = new ArrayList<>(); @@ -355,6 +361,13 @@ private List innerCreate(@NotNull @Valid CreateFlowInsta conn = connectionService.getForConnectionSkipPermissionCheck(createReq.getConnectionId()); cloudMetadataClient.checkPermission(OBTenant.of(conn.getClusterName(), conn.getTenantName()), conn.getInstanceType(), false, CloudPermissionAction.READONLY); + } else if (createReq.getTaskType() == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters taskParameters = + (MultipleDatabaseChangeParameters) createReq.getParameters(); + List conns = taskParameters.getDatabases().stream().map( + DatabaseChangeDatabase::getDataSource).distinct().collect(Collectors.toList()); + conns.forEach(con -> cloudMetadataClient.checkPermission(OBTenant.of(con.getClusterName(), + con.getTenantName()), con.getInstanceType(), false, CloudPermissionAction.READONLY)); } return Collections.singletonList(buildFlowInstance(riskLevels, createReq, conn)); } @@ -434,6 +447,7 @@ public Page listAll(@NotNull Pageable pageable, @NotNull Que } else { // Task type which will be filtered independently List types = Arrays.asList( + TaskType.MULTIPLE_ASYNC, TaskType.EXPORT, TaskType.IMPORT, TaskType.MOCKDATA, @@ -475,7 +489,7 @@ public Page listAll(@NotNull Pageable pageable, @NotNull Que resourceRoleService.getProjectId2ResourceRoleNames(); Set ownerProjectIds = currentUserProjectId2ResourceRoleNames.entrySet().stream() .filter(entry -> entry.getValue().contains(ResourceRoleName.OWNER)) - .map(Map.Entry::getKey) + .map(Entry::getKey) .collect(Collectors.toSet()); Set otherRoleProjectIds = new HashSet<>(currentUserProjectId2ResourceRoleNames.keySet()); otherRoleProjectIds.removeAll(ownerProjectIds); @@ -769,6 +783,10 @@ private void checkCreateFlowInstancePermission(CreateFlowInstanceReq req) { DBStructureComparisonParameter p = (DBStructureComparisonParameter) req.getParameters(); databaseIds.add(p.getTargetDatabaseId()); databaseIds.add(p.getSourceDatabaseId()); + } else if (taskType == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters parameters = (MultipleDatabaseChangeParameters) req.getParameters(); + databaseIds = + parameters.getOrderedDatabaseIds().stream().flatMap(Collection::stream).collect(Collectors.toSet()); } databasePermissionHelper.checkPermissions(databaseIds, DatabasePermissionType.from(req.getTaskType())); } @@ -898,6 +916,11 @@ private String generateFlowInstanceName(@NonNull CreateFlowInstanceReq req) { DBStructureComparisonParameter parameters = (DBStructureComparisonParameter) req.getParameters(); return "structure_comparison_" + parameters.getSourceDatabaseId() + "_" + parameters.getTargetDatabaseId(); } + if (req.getTaskType() == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters parameters = (MultipleDatabaseChangeParameters) req.getParameters(); + return "multiple_database_" + parameters.getOrderedDatabaseIds(); + } + String schemaName = req.getDatabaseName(); String connectionName = req.getConnectionId() == null ? "no_connection" : req.getConnectionId() + ""; if (schemaName == null) { @@ -940,24 +963,58 @@ private FlowInstanceConfigurer buildConfigurer( configurer = configurer.next(approvalGatewayInstance).route(String.format("${!%s}", FlowApprovalInstance.APPROVAL_VARIABLE_NAME), flowInstance.endFlowInstance()); if (nodeSequence == nodeConfigs.size() - 1) { - ExecutionStrategyConfig strategyConfig = ExecutionStrategyConfig.from(flowInstanceReq, - approvalFlowConfig.getWaitExecutionExpirationIntervalSeconds()); - FlowTaskInstance taskInstance = flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, true, - taskType, strategyConfig); - taskInstance.setTargetTaskId(targetTaskId); - FlowInstanceConfigurer taskConfigurer; - if (taskType == TaskType.ASYNC - && Boolean.TRUE.equals(((DatabaseChangeParameters) parameters).getGenerateRollbackPlan())) { - FlowTaskInstance rollbackPlanInstance = - flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, false, - TaskType.GENERATE_ROLLBACK, ExecutionStrategyConfig.autoStrategy()); - taskConfigurer = flowInstance.newFlowInstanceConfigurer(rollbackPlanInstance).next(taskInstance); + if (taskType == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters multipleDatabaseChangeParameters = + (MultipleDatabaseChangeParameters) parameters; + FlowInstanceConfigurer taskConfigurer = null; + int orders = ((MultipleDatabaseChangeParameters) flowInstanceReq.getParameters()) + .getOrderedDatabaseIds().size(); + for (int i = 0; i < orders; i++) { + // ExecutionStrategyConfig for multiple databases change flow + ExecutionStrategyConfig strategyConfig = ExecutionStrategyConfig.from(flowInstanceReq, + Math.toIntExact(multipleDatabaseChangeParameters.getManualTimeoutMillis()) / 1000); + FlowTaskInstance taskInstance; + if (i == orders - 1) { + taskInstance = flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, true, + taskType, strategyConfig); + } else { + taskInstance = flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, false, + taskType, strategyConfig); + } + + taskInstance.setTargetTaskId(targetTaskId); + if (taskConfigurer == null) { + taskConfigurer = flowInstance.newFlowInstanceConfigurer(taskInstance); + } else { + taskConfigurer.next(taskInstance); + } + } + taskConfigurer.endFlowInstance(); + configurer.route(String.format("${%s}", FlowApprovalInstance.APPROVAL_VARIABLE_NAME), + taskConfigurer); } else { - taskConfigurer = flowInstance.newFlowInstanceConfigurer(taskInstance); + ExecutionStrategyConfig strategyConfig = ExecutionStrategyConfig.from(flowInstanceReq, + approvalFlowConfig.getWaitExecutionExpirationIntervalSeconds()); + FlowTaskInstance taskInstance = + flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, true, + taskType, strategyConfig); + taskInstance.setTargetTaskId(targetTaskId); + FlowInstanceConfigurer taskConfigurer; + if (taskType == TaskType.ASYNC + && Boolean.TRUE.equals(((DatabaseChangeParameters) parameters).getGenerateRollbackPlan())) { + FlowTaskInstance rollbackPlanInstance = + flowFactory.generateFlowTaskInstance(flowInstance.getId(), false, false, + TaskType.GENERATE_ROLLBACK, ExecutionStrategyConfig.autoStrategy()); + taskConfigurer = + flowInstance.newFlowInstanceConfigurer(rollbackPlanInstance).next(taskInstance); + } else { + taskConfigurer = flowInstance.newFlowInstanceConfigurer(taskInstance); + } + taskConfigurer.endFlowInstance(); + configurer.route(String.format("${%s}", FlowApprovalInstance.APPROVAL_VARIABLE_NAME), + taskConfigurer); } - taskConfigurer.endFlowInstance(); - configurer.route(String.format("${%s}", FlowApprovalInstance.APPROVAL_VARIABLE_NAME), - taskConfigurer); + } configurers.add(configurer); } @@ -1019,6 +1076,7 @@ private void initVariables(Map variables, TaskEntity taskEntity, FlowTaskUtil.setCloudMainAccountId(variables, authenticationFacade.currentUser().getParentUid()); } + private TemplateVariables buildTemplateVariables(CreateFlowInstanceReq flowInstanceReq, ConnectionConfig config) { TemplateVariables variables = new TemplateVariables(); // set task url @@ -1173,6 +1231,10 @@ private RiskLevelDescriber buildRiskLevelDescriber(CreateFlowInstanceReq req) { .build(); } + public List getFlowInstanceByParentId(Long parentFlowInstanceId) { + return flowInstanceRepository.findByParentInstanceId(parentFlowInstanceId); + } + public Set getApprovingAlterScheduleById(Long parentFlowInstanceId) { Specification sp = Specification.where( FlowInstanceSpecs.parentInstanceIdIn(Collections.singletonList(parentFlowInstanceId))) diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java index 93e76a9aba..2d92f725fe 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowTaskInstanceService.java @@ -60,11 +60,13 @@ import com.oceanbase.odc.core.shared.exception.InternalServerError; import com.oceanbase.odc.core.shared.exception.NotFoundException; import com.oceanbase.odc.core.shared.exception.UnsupportedException; +import com.oceanbase.odc.metadb.collaboration.EnvironmentRepository; import com.oceanbase.odc.metadb.flow.FlowInstanceRepository; import com.oceanbase.odc.metadb.task.TaskEntity; import com.oceanbase.odc.metadb.task.TaskRepository; import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig; import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferTaskResult; +import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; import com.oceanbase.odc.service.common.FileManager; import com.oceanbase.odc.service.common.model.FileBucket; import com.oceanbase.odc.service.common.response.ListResponse; @@ -87,6 +89,8 @@ import com.oceanbase.odc.service.flow.task.model.DatabaseChangeResult; import com.oceanbase.odc.service.flow.task.model.MockDataTaskResult; import com.oceanbase.odc.service.flow.task.model.MockProperties; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeTaskResult; +import com.oceanbase.odc.service.flow.task.model.MultipleSqlCheckTaskResult; import com.oceanbase.odc.service.flow.task.model.OnlineSchemaChangeTaskResult; import com.oceanbase.odc.service.flow.task.model.PartitionPlanTaskResult; import com.oceanbase.odc.service.flow.task.model.ResultSetExportResult; @@ -155,6 +159,11 @@ public class FlowTaskInstanceService { private TaskFrameworkEnabledProperties taskFrameworkProperties; @Autowired private LoggerService loggerService; + @Autowired + private EnvironmentRepository environmentRepository; + @Autowired + private EnvironmentService environmentService; + @Value("${odc.task.async.result-preview-max-size-bytes:5242880}") private long resultPreviewMaxSizeBytes; @@ -243,6 +252,8 @@ public List getTaskResultFromEntity(@NotNull TaskEntit results = getDataTransferResult(taskEntity); } else if (taskEntity.getTaskType() == TaskType.ASYNC) { results = getAsyncResult(taskEntity); + } else if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC) { + results = getMultipleAsyncResult(taskEntity); } else if (taskEntity.getTaskType() == TaskType.MOCKDATA) { results = getMockDataResult(taskEntity); } else if (taskEntity.getTaskType() == TaskType.IMPORT) { @@ -287,11 +298,13 @@ public List getResult( || taskInstance.getTaskType() == TaskType.PRE_CHECK) { Long taskId = taskInstance.getTargetTaskId(); TaskEntity taskEntity = this.taskService.detail(taskId); + // pre-check is for multiple databases PreCheckTaskResult result = JsonUtils.fromJson(taskEntity.getResultJson(), PreCheckTaskResult.class); if (Objects.isNull(result)) { return Collections.emptyList(); } SqlCheckTaskResult checkTaskResult = result.getSqlCheckResult(); + MultipleSqlCheckTaskResult multipleSqlCheckTaskResult = result.getMultipleSqlCheckTaskResult(); ExecutorInfo info = result.getExecutorInfo(); if (!this.dispatchChecker.isThisMachine(info)) { DispatchResponse response = requestDispatcher.forward(info.getHost(), info.getPort()); @@ -299,16 +312,30 @@ public List getResult( new TypeReference>() {}).getData().getContents(); } String dir = FileManager.generateDir(FileBucket.PRE_CHECK) + File.separator + taskId; - Verify.notNull(checkTaskResult.getFileName(), "SqlCheckResultFileName"); - File jsonFile = new File(dir + File.separator + checkTaskResult.getFileName()); + String fileName; + if (checkTaskResult != null) { + Verify.notNull(checkTaskResult.getFileName(), "SqlCheckResultFileName"); + fileName = checkTaskResult.getFileName(); + } else { + Verify.notNull(multipleSqlCheckTaskResult.getFileName(), "MultipleSqlCheckTaskResult"); + fileName = multipleSqlCheckTaskResult.getFileName(); + } + File jsonFile = new File(dir + File.separator + fileName); if (!jsonFile.exists()) { throw new NotFoundException(ErrorCodes.NotFound, new Object[] { - ResourceType.ODC_FILE.getLocalizedMessage(), "file", jsonFile.getName()}, "File is not found"); + ResourceType.ODC_FILE.getLocalizedMessage(), "file", jsonFile.getName()}, + "File is not found"); } String content = FileUtils.readFileToString(jsonFile, Charsets.UTF_8); - checkTaskResult = JsonUtils.fromJson(content, SqlCheckTaskResult.class); - checkTaskResult.setFileName(null); - result.setSqlCheckResult(checkTaskResult); + if (checkTaskResult != null) { + checkTaskResult = JsonUtils.fromJson(content, SqlCheckTaskResult.class); + checkTaskResult.setFileName(null); + result.setSqlCheckResult(checkTaskResult); + } else { + multipleSqlCheckTaskResult = JsonUtils.fromJson(content, MultipleSqlCheckTaskResult.class); + multipleSqlCheckTaskResult.setFileName(null); + result.setMultipleSqlCheckTaskResult(multipleSqlCheckTaskResult); + } result.setExecutorInfo(null); return Collections.singletonList(result); } else { @@ -593,6 +620,10 @@ private List filterTaskInstance(@NonNull Long flowInstanceId, }).collect(Collectors.toList()), false); } + private List getMultipleAsyncResult(@NonNull TaskEntity taskEntity) { + return innerGetResult(taskEntity, MultipleDatabaseChangeTaskResult.class); + } + private List getAsyncResult(@NonNull TaskEntity taskEntity) throws IOException { if (!dispatchChecker.isTaskEntityOnThisMachine(taskEntity)) { /** @@ -727,8 +758,12 @@ private Optional getTaskEntity(@NonNull Long flowInstanceId, if (CollectionUtils.isEmpty(taskInstances)) { return Optional.empty(); } - Verify.singleton(taskInstances, "TaskInstances"); - + /** + * The other types of taskInstances are limited to unique, except for the MULTIPLE_ASYNC + */ + if (taskInstances.get(0).getTaskType() != TaskType.MULTIPLE_ASYNC) { + Verify.singleton(taskInstances, "TaskInstances"); + } FlowTaskInstance flowTaskInstance = taskInstances.get(0); Long targetTaskId = flowTaskInstance.getTargetTaskId(); Verify.notNull(targetTaskId, "TargetTaskId can not be null"); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/CreateFlowInstanceReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/CreateFlowInstanceReq.java index e77633fa09..e4fad927f8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/CreateFlowInstanceReq.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/CreateFlowInstanceReq.java @@ -32,6 +32,7 @@ import com.oceanbase.odc.service.flow.processor.CreateFlowInstanceProcessAspect; import com.oceanbase.odc.service.flow.task.model.DBStructureComparisonParameter; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.OdcMockTaskConfig; import com.oceanbase.odc.service.flow.task.model.ShadowTableSyncTaskParameter; import com.oceanbase.odc.service.onlineschemachange.model.OnlineSchemaChangeParameters; @@ -90,6 +91,7 @@ public class CreateFlowInstanceReq { @JsonSubTypes(value = { @JsonSubTypes.Type(value = OdcMockTaskConfig.class, name = "MOCKDATA"), @JsonSubTypes.Type(value = DataTransferConfig.class, names = {"EXPORT", "IMPORT"}), + @JsonSubTypes.Type(value = MultipleDatabaseChangeParameters.class, names = {"MULTIPLE_ASYNC"}), @JsonSubTypes.Type(value = DatabaseChangeParameters.class, names = {"ASYNC"}), @JsonSubTypes.Type(value = PartitionPlanConfig.class, name = "PARTITION_PLAN"), @JsonSubTypes.Type(value = ShadowTableSyncTaskParameter.class, name = "SHADOWTABLE_SYNC"), diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowInstanceDetailResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowInstanceDetailResp.java index e5827afd9a..d1762ba35c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowInstanceDetailResp.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowInstanceDetailResp.java @@ -50,6 +50,7 @@ import com.oceanbase.odc.service.flow.model.FlowNodeInstanceDetailResp.FlowNodeInstanceMapper; import com.oceanbase.odc.service.flow.task.model.DBStructureComparisonParameter; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.OdcMockTaskConfig; import com.oceanbase.odc.service.flow.task.model.ShadowTableSyncTaskParameter; import com.oceanbase.odc.service.flow.util.FlowInstanceUtil; @@ -283,6 +284,9 @@ public FlowInstanceDetailResp map(@NonNull FlowInstance flowInstance, @NonNull F resp.setRiskLevel(getRiskLevel(taskEntity)); String parameterJson = taskEntity.getParametersJson(); switch (taskEntity.getTaskType()) { + case MULTIPLE_ASYNC: + resp.setParameters(JsonUtils.fromJson(parameterJson, MultipleDatabaseChangeParameters.class)); + break; case ASYNC: resp.setParameters(JsonUtils.fromJson(parameterJson, DatabaseChangeParameters.class)); break; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java index 220becd89e..2d6679d89c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/FlowNodeInstanceDetailResp.java @@ -147,12 +147,15 @@ public FlowNodeInstanceDetailResp map(@NonNull FlowTaskInstance instance) { resp.setDeadlineTime(new Date(instance.getUpdateTime().getTime() + expireInterval)); } if (taskEntity != null && taskEntity.getTaskType() == TaskType.PRE_CHECK) { + // Determine whether to perform a multiple database pre-check based on ParametersJson PreCheckTaskResult result = JsonUtils.fromJson(taskEntity.getResultJson(), PreCheckTaskResult.class); if (result != null) { resp.setPreCheckOverLimit(result.isOverLimit()); if (Objects.nonNull(result.getSqlCheckResult())) { resp.setIssueCount(result.getSqlCheckResult().getIssueCount()); + } else if (Objects.nonNull(result.getMultipleSqlCheckTaskResult())) { + resp.setIssueCount(result.getMultipleSqlCheckTaskResult().getIssueCount()); } if (Objects.nonNull(result.getPermissionCheckResult())) { resp.setUnauthorizedDatabases(result.getPermissionCheckResult().getUnauthorizedDatabases()); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/PreCheckTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/PreCheckTaskResult.java index ab9eea11fc..c577f8769a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/PreCheckTaskResult.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/model/PreCheckTaskResult.java @@ -18,6 +18,7 @@ import com.oceanbase.odc.core.flow.model.FlowTaskResult; import com.oceanbase.odc.service.flow.task.model.DatabasePermissionCheckResult; +import com.oceanbase.odc.service.flow.task.model.MultipleSqlCheckTaskResult; import com.oceanbase.odc.service.flow.task.model.SqlCheckTaskResult; import com.oceanbase.odc.service.task.model.ExecutorInfo; @@ -37,6 +38,8 @@ public class PreCheckTaskResult implements FlowTaskResult { private SqlCheckTaskResult sqlCheckResult; + private MultipleSqlCheckTaskResult multipleSqlCheckTaskResult; + private DatabasePermissionCheckResult permissionCheckResult; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java index 4997885e0b..3c5ee9e0b2 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/processor/CreateFlowInstanceProcessAspect.java @@ -34,6 +34,7 @@ import com.oceanbase.odc.core.shared.exception.BadRequestException; import com.oceanbase.odc.metadb.schedule.ScheduleEntity; import com.oceanbase.odc.plugin.task.api.datatransfer.model.DataTransferConfig; +import com.oceanbase.odc.service.collaboration.project.ProjectService; import com.oceanbase.odc.service.connection.database.DatabaseService; import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; @@ -66,6 +67,8 @@ public class CreateFlowInstanceProcessAspect implements InitializingBean { private ScheduleService scheduleService; @Autowired private List preprocessors; + @Autowired + private ProjectService projectService; private final Map scheduleTaskPreprocessors = new HashMap<>(); @@ -82,7 +85,7 @@ public void preprocess(JoinPoint point) throws Throwable { DBStructureComparisonParameter parameters = (DBStructureComparisonParameter) req.getParameters(); req.setDatabaseId(parameters.getSourceDatabaseId()); } - if (Objects.nonNull(req.getDatabaseId())) { + if (Objects.nonNull(req.getDatabaseId()) && req.getTaskType() != TaskType.MULTIPLE_ASYNC) { adaptCreateFlowInstanceReq(req); } if (req.getTaskType() != TaskType.ALTER_SCHEDULE) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java new file mode 100644 index 0000000000..dd7c6c1bf4 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MultipleDatabaseChangeRuntimeFlowableTask.java @@ -0,0 +1,313 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.task; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.flowable.engine.delegate.DelegateExecution; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.core.shared.constant.FlowStatus; +import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy; +import com.oceanbase.odc.core.shared.constant.TaskType; +import com.oceanbase.odc.metadb.flow.FlowInstanceEntity; +import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; +import com.oceanbase.odc.metadb.task.TaskEntity; +import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.database.model.Database; +import com.oceanbase.odc.service.databasechange.MultipleDatabaseChangeTraceContextHolder; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeFlowInstanceDetailResp; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangingRecord; +import com.oceanbase.odc.service.flow.FlowInstanceService; +import com.oceanbase.odc.service.flow.FlowableAdaptor; +import com.oceanbase.odc.service.flow.instance.FlowTaskInstance; +import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; +import com.oceanbase.odc.service.flow.model.FlowInstanceDetailResp; +import com.oceanbase.odc.service.flow.model.FlowNodeStatus; +import com.oceanbase.odc.service.flow.model.FlowTaskExecutionStrategy; +import com.oceanbase.odc.service.flow.model.QueryFlowInstanceParams; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeTaskResult; +import com.oceanbase.odc.service.flow.util.FlowTaskUtil; +import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; +import com.oceanbase.odc.service.task.TaskService; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author: zijia.cj + * @date: 2024/3/29 + */ +@Slf4j +public class MultipleDatabaseChangeRuntimeFlowableTask extends BaseODCFlowTaskDelegate { + + private volatile boolean isSuccessful = false; + private volatile boolean isFailure = false; + + private Integer batchId; + + private Integer batchSum; + List> orderedDatabaseIds; + + private FlowTaskExecutionStrategy flowTaskExecutionStrategy; + + private MultipleDatabaseChangeParameters multipleDatabaseChangeParameters; + + @Autowired + private FlowInstanceService flowInstanceService; + + @Autowired + private ServiceTaskInstanceRepository serviceTaskInstanceRepository; + @Autowired + private AuthenticationFacade authenticationFacade; + @Autowired + private DatabaseService databaseService; + @Autowired + private FlowableAdaptor flowableAdaptor; + + @Override + public boolean cancel(boolean mayInterruptIfRunning, Long taskId, TaskService taskService) { + // todo implement later + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + protected boolean isSuccessful() { + if (this.flowTaskExecutionStrategy == null) { + return this.isSuccessful; + } + return isContinue(this.flowTaskExecutionStrategy, multipleDatabaseChangeParameters.getAutoErrorStrategy()); + } + + @Override + protected boolean isFailure() { + if (this.flowTaskExecutionStrategy == null) { + return this.isFailure; + } + return !isContinue(this.flowTaskExecutionStrategy, multipleDatabaseChangeParameters.getAutoErrorStrategy()); + } + + @Override + protected Void start(Long taskId, TaskService taskService, DelegateExecution execution) + throws InterruptedException { + MultipleDatabaseChangeTraceContextHolder.trace(taskId); + try { + FlowInstanceDetailResp flowInstanceDetailResp = flowInstanceService.detail(getFlowInstanceId()); + this.flowTaskExecutionStrategy = flowInstanceDetailResp.getExecutionStrategy(); + TaskEntity detail = taskService.detail(taskId); + this.multipleDatabaseChangeParameters = JsonUtils.fromJson( + detail.getParametersJson(), MultipleDatabaseChangeParameters.class); + this.orderedDatabaseIds = multipleDatabaseChangeParameters.getOrderedDatabaseIds(); + Integer value = multipleDatabaseChangeParameters.getBatchId(); + if (value == null) { + this.batchId = 0; + } else { + this.batchId = value; + } + log.info("multiple database task start, taskId={}, batchId={}", taskId, + this.batchId + 1); + multipleDatabaseChangeParameters.setBatchId(this.batchId + 1); + detail.setParametersJson(JsonUtils.toJson(multipleDatabaseChangeParameters)); + taskService.updateParametersJson(detail); + this.batchSum = multipleDatabaseChangeParameters.getOrderedDatabaseIds().size(); + List batchDatabaseIds = + multipleDatabaseChangeParameters.getOrderedDatabaseIds().get(this.batchId); + List flowInstanceIds = new ArrayList<>(); + for (Long batchDatabaseId : batchDatabaseIds) { + CreateFlowInstanceReq createFlowInstanceReq = new CreateFlowInstanceReq(); + createFlowInstanceReq.setDatabaseId(batchDatabaseId); + createFlowInstanceReq.setTaskType(TaskType.ASYNC); + createFlowInstanceReq.setExecutionStrategy(FlowTaskExecutionStrategy.AUTO); + createFlowInstanceReq.setParentFlowInstanceId(FlowTaskUtil.getFlowInstanceId(execution)); + createFlowInstanceReq.setParameters(multipleDatabaseChangeParameters + .convertIntoDatabaseChangeParameters(multipleDatabaseChangeParameters)); + List individualFlowInstance = flowInstanceService.createIndividualFlowInstance( + createFlowInstanceReq); + flowInstanceIds.add(individualFlowInstance.get(0).getId()); + } + + long originalTime = System.currentTimeMillis(); + boolean flagForTaskSucceed = true; + // todo 待优化,做成异步回调,减少阻塞和查数据库的次数。 + while (System.currentTimeMillis() - originalTime <= multipleDatabaseChangeParameters.getTimeoutMillis()) { + int numberForEndLoop = 0; + List flowInstanceEntityList = flowInstanceService.listByIds(flowInstanceIds); + for (FlowInstanceEntity flowInstanceEntity : flowInstanceEntityList) { + if (flowInstanceEntity != null) { + switch (flowInstanceEntity.getStatus()) { + case EXECUTION_SUCCEEDED: + numberForEndLoop++; + break; + case EXECUTION_FAILED: + case EXECUTION_EXPIRED: + flagForTaskSucceed = false; + numberForEndLoop++; + break; + default: + break; + } + } + } + if (numberForEndLoop == multipleDatabaseChangeParameters.getOrderedDatabaseIds().get( + this.batchId).size()) { + break; + } + Thread.sleep(1000); + } + // Check whether the current batch database change is successfully initiated + if (flagForTaskSucceed) { + this.isFailure = false; + this.isSuccessful = true; + } else { + this.isFailure = true; + this.isSuccessful = false; + } + return null; + } catch (Exception e) { + log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + this.batchId == null ? null : this.batchId + 1, e); + this.isFailure = true; + this.isSuccessful = false; + throw e; + } finally { + MultipleDatabaseChangeTraceContextHolder.clear(); + } + } + + + + @Override + protected void onFailure(Long taskId, TaskService taskService) { + try { + MultipleDatabaseChangeTraceContextHolder.trace(taskId); + log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + this.batchId == null ? null : this.batchId + 1); + updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED); + taskService.fail(taskId, 100, generateResult()); + super.onFailure(taskId, taskService); + } finally { + MultipleDatabaseChangeTraceContextHolder.clear(); + } + } + + @Override + protected void onSuccessful(Long taskId, TaskService taskService) { + try { + MultipleDatabaseChangeTraceContextHolder.trace(taskId); + log.info("multiple database task succeed, taskId={}, batchId={}", taskId, this.batchId + 1); + if (this.batchId == batchSum - 1) { + List list = flowInstanceService.getFlowInstanceByParentId( + getFlowInstanceId()); + boolean allSucceeded = list.stream() + .allMatch( + flowInstanceEntity -> FlowStatus.EXECUTION_SUCCEEDED == flowInstanceEntity.getStatus()); + if (allSucceeded) { + updateFlowInstanceStatus(FlowStatus.EXECUTION_SUCCEEDED); + } else { + updateFlowInstanceStatus(FlowStatus.EXECUTION_FAILED); + } + taskService.succeed(taskId, generateResult()); + } else { + taskService.updateProgress(taskId, (this.batchId + 1) * 100D / this.batchSum); + } + super.onSuccessful(taskId, taskService); + Optional taskInstanceByActivityId = flowableAdaptor.getTaskInstanceByActivityId( + getActivityId(), getFlowInstanceId()); + if (!this.isSuccessful) { + serviceTaskInstanceRepository.updateStatusById(taskInstanceByActivityId.get().getId(), + FlowNodeStatus.FAILED); + } + } catch (Exception e) { + log.warn("multiple database task failed, taskId={}, batchId={}", taskId, + this.batchId == null ? null : this.batchId + 1, e); + } finally { + MultipleDatabaseChangeTraceContextHolder.clear(); + } + } + + @Override + protected void onTimeout(Long taskId, TaskService taskService) { + try { + MultipleDatabaseChangeTraceContextHolder.trace(taskId); + taskService.fail(taskId, 100, generateResult()); + log.warn("multiple database task timeout, taskId={}, batchId={}", taskId, + this.batchId == null ? null : this.batchId + 1); + } finally { + MultipleDatabaseChangeTraceContextHolder.clear(); + } + super.onTimeout(taskId, taskService); + } + + @Override + protected void onProgressUpdate(Long taskId, TaskService taskService) {} + + private MultipleDatabaseChangeTaskResult generateResult() { + MultipleDatabaseChangeTaskResult result = new MultipleDatabaseChangeTaskResult(); + Long flowInstanceId = getFlowInstanceId(); + QueryFlowInstanceParams param = QueryFlowInstanceParams.builder().parentInstanceId(flowInstanceId) + .build(); + Page page = flowInstanceService.list(Pageable.unpaged(), param); + List flowInstanceDetailRespList = page.getContent(); + // todo At present multi-databases changes do not include databases with the same name + Map databaseId2FlowInstanceDetailResp = + flowInstanceDetailRespList.stream().collect( + Collectors.toMap(flowInstanceDetailResp -> flowInstanceDetailResp.getDatabase().getId(), + flowInstanceDetailResp -> flowInstanceDetailResp)); + List idList = this.orderedDatabaseIds.stream().flatMap(Collection::stream).collect(Collectors.toList()); + List databaseList = databaseService.listDatabasesDetailsByIds(idList); + ArrayList databaseChangingRecords = new ArrayList<>(); + for (Database database : databaseList) { + FlowInstanceDetailResp flowInstanceDetailResp = databaseId2FlowInstanceDetailResp.get(database.getId()); + DatabaseChangingRecord databaseChangingRecord = new DatabaseChangingRecord(); + databaseChangingRecord.setDatabase(new DatabaseChangeDatabase(database)); + databaseChangingRecord.setFlowInstanceDetailResp( + flowInstanceDetailResp != null ? new DatabaseChangeFlowInstanceDetailResp(flowInstanceDetailResp) + : null); + databaseChangingRecord.setStatus(flowInstanceDetailResp != null ? flowInstanceDetailResp.getStatus() + : FlowStatus.WAIT_FOR_EXECUTION); + databaseChangingRecords.add(databaseChangingRecord); + } + result.setDatabaseChangingRecordList(databaseChangingRecords); + + return result; + } + + private Boolean isContinue(FlowTaskExecutionStrategy flowTaskExecutionStrategy, + TaskErrorStrategy autoTaskErrorStrategy) { + if (flowTaskExecutionStrategy == FlowTaskExecutionStrategy.MANUAL + || (flowTaskExecutionStrategy == FlowTaskExecutionStrategy.AUTO + && autoTaskErrorStrategy == TaskErrorStrategy.CONTINUE)) { + return true; + } else { + return this.isSuccessful; + } + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PreCheckRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PreCheckRuntimeFlowableTask.java index faea530b73..71b0752c5f 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PreCheckRuntimeFlowableTask.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/PreCheckRuntimeFlowableTask.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +43,7 @@ import com.oceanbase.odc.common.unit.BinarySizeUnit; import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.core.shared.PreConditions; +import com.oceanbase.odc.core.shared.constant.DialectType; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.core.shared.exception.VerifyException; import com.oceanbase.odc.core.sql.execute.model.SqlTuple; @@ -52,12 +54,16 @@ import com.oceanbase.odc.service.common.model.FileBucket; import com.oceanbase.odc.service.common.util.SqlUtils; import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; import com.oceanbase.odc.service.flow.exception.ServiceTaskError; import com.oceanbase.odc.service.flow.model.FlowNodeStatus; import com.oceanbase.odc.service.flow.model.PreCheckTaskResult; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.DatabasePermissionCheckResult; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; +import com.oceanbase.odc.service.flow.task.model.MultipleSqlCheckTaskResult; import com.oceanbase.odc.service.flow.task.model.PreCheckTaskProperties; import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; import com.oceanbase.odc.service.flow.task.model.SqlCheckTaskResult; @@ -95,6 +101,7 @@ public class PreCheckRuntimeFlowableTask extends BaseODCFlowTaskDelegate { private volatile boolean success = false; private volatile boolean overLimit = false; private volatile SqlCheckTaskResult sqlCheckResult = null; + private volatile MultipleSqlCheckTaskResult multipleSqlCheckTaskResult = null; private volatile DatabasePermissionCheckResult permissionCheckResult = null; private Long creatorId; private List userInputSqls; @@ -102,6 +109,7 @@ public class PreCheckRuntimeFlowableTask extends BaseODCFlowTaskDelegate { private SqlStatementIterator uploadFileSqlIterator; private ConnectionConfig connectionConfig; private Long preCheckTaskId; + private List databaseList; @Autowired private ApprovalFlowConfigSelector approvalFlowConfigSelector; @Autowired @@ -112,7 +120,6 @@ public class PreCheckRuntimeFlowableTask extends BaseODCFlowTaskDelegate { private PreCheckTaskProperties preCheckTaskProperties; @Autowired private ObjectStorageFacade storageFacade; - private static final String CHECK_RESULT_FILE_NAME = "sql-check-result.json"; private final Map riskLevelResult = new HashMap<>(); @@ -134,15 +141,32 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe } catch (VerifyException e) { log.info(e.getMessage()); } - RiskLevelDescriber riskLevelDescriber = FlowTaskUtil.getRiskLevelDescriber(execution); - if (Objects.nonNull(this.connectionConfig)) { + RiskLevelDescriber riskLevelDescriber = null; + Map databaseId2RiskLevelDescriber = null; + if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters parameters = JsonUtils.fromJson( + taskEntity.getParametersJson(), MultipleDatabaseChangeParameters.class); + List databaseIds = parameters.getOrderedDatabaseIds().stream() + .flatMap(List::stream).collect(Collectors.toList()); + this.databaseList = databaseService.listDatabasesDetailsByIds(databaseIds); + // 单独构建databaseId2RiskLevelDescriber + databaseId2RiskLevelDescriber = buildDatabaseId2RiskLevelDescriber(this.databaseList); + } else { + riskLevelDescriber = FlowTaskUtil.getRiskLevelDescriber(execution); + } + + if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC || Objects.nonNull(this.connectionConfig)) { // Skip SQL pre-check if connection config is null loadUserInputSqlContent(taskEntity.getTaskType(), taskEntity.getParametersJson()); loadUploadFileInputStream(taskEntity.getTaskType(), taskEntity.getParametersJson()); try { - preCheck(taskEntity, preCheckTaskEntity, riskLevelDescriber); + if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC) { + preCheck(taskEntity, preCheckTaskEntity, databaseId2RiskLevelDescriber); + } else { + preCheck(taskEntity, preCheckTaskEntity, riskLevelDescriber); + } } catch (Exception e) { - log.warn("pre check failed, e"); + log.warn("pre check failed", e); throw new ServiceTaskError(e); } finally { if (Objects.nonNull(this.uploadFileInputStream)) { @@ -153,15 +177,43 @@ protected Void start(Long taskId, TaskService taskService, DelegateExecution exe } } } - if (this.sqlCheckResult != null) { - riskLevelDescriber.setSqlCheckResult(sqlCheckResult.getMaxLevel() + ""); + if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC) { + if (this.multipleSqlCheckTaskResult != null && databaseId2RiskLevelDescriber != null) { + List sqlCheckTaskResultList = + multipleSqlCheckTaskResult.getSqlCheckTaskResultList(); + for (int i = 0; i < sqlCheckTaskResultList.size(); i++) { + if (sqlCheckTaskResultList.get(i) != null) { + databaseId2RiskLevelDescriber.get(databaseList.get(i).getId()) + .setSqlCheckResult(sqlCheckTaskResultList.get(i).getMaxLevel() + ""); + } + if (this.overLimit) { + databaseId2RiskLevelDescriber.get(databaseList.get(i).getId()).setOverLimit(true); + } + } + } } - if (this.overLimit) { - riskLevelDescriber.setOverLimit(true); + if (taskEntity.getTaskType() != TaskType.MULTIPLE_ASYNC && riskLevelDescriber != null) { + if (this.sqlCheckResult != null) { + riskLevelDescriber.setSqlCheckResult(sqlCheckResult.getMaxLevel() + ""); + } + if (this.overLimit) { + riskLevelDescriber.setOverLimit(true); + } } } try { - RiskLevel riskLevel = approvalFlowConfigSelector.select(riskLevelDescriber); + RiskLevel riskLevel; + if (taskEntity.getTaskType() == TaskType.MULTIPLE_ASYNC + && databaseId2RiskLevelDescriber != null) { + riskLevel = databaseId2RiskLevelDescriber.values().stream() + .map(approvalFlowConfigSelector::select) + .max(Comparator.comparingInt(RiskLevel::getLevel)) + .orElseThrow(() -> new IllegalStateException("Unknown error")); + } else if (riskLevelDescriber != null) { + riskLevel = approvalFlowConfigSelector.select(riskLevelDescriber); + } else { + throw new IllegalStateException("Unknown error"); + } taskEntity.setRiskLevelId(riskLevel.getId()); taskEntity.setExecutionExpirationIntervalSeconds( riskLevel.getApprovalFlowConfig().getExecutionExpirationIntervalSeconds()); @@ -193,7 +245,6 @@ protected boolean isFailure() { return false; } - @Override protected void onFailure(Long taskId, TaskService taskService) { log.warn("RiskLevel Detect task failed, taskId={}", this.preCheckTaskId); @@ -237,7 +288,8 @@ public boolean isCancelled() { return false; } - private void preCheck(TaskEntity taskEntity, TaskEntity preCheckTaskEntity, RiskLevelDescriber riskLevelDescriber) { + private void preCheck(TaskEntity taskEntity, TaskEntity preCheckTaskEntity, + RiskLevelDescriber riskLevelDescriber) { TaskType taskType = taskEntity.getTaskType(); if (taskType.needsPreCheck()) { if (taskType == TaskType.ALTER_SCHEDULE) { @@ -254,6 +306,18 @@ private void preCheck(TaskEntity taskEntity, TaskEntity preCheckTaskEntity, Risk } } + private void preCheck(TaskEntity taskEntity, TaskEntity preCheckTaskEntity, + Map databaseId2RiskLevelDescriber) { + TaskType taskType = taskEntity.getTaskType(); + doMultipleSqlCheckAndDatabasePermissionCheck(preCheckTaskEntity, databaseId2RiskLevelDescriber, taskType); + this.permissionCheckResult.setUnauthorizedDatabases( + this.permissionCheckResult.getUnauthorizedDatabases().stream().distinct().collect(Collectors.toList())); + if (isIntercepted(this.sqlCheckResult, this.permissionCheckResult)) { + throw new ServiceTaskError(new RuntimeException()); + } + + } + private boolean isIntercepted(SqlCheckTaskResult sqlCheckResult, DatabasePermissionCheckResult permissionCheckResult) { if (Objects.isNull(sqlCheckResult) && Objects.isNull(permissionCheckResult)) { @@ -283,24 +347,8 @@ private void doSqlCheckAndDatabasePermissionCheck(TaskEntity preCheckTaskEntity, if (CollectionUtils.isNotEmpty(sqls)) { violations.addAll(this.sqlCheckService.check(Long.valueOf(describer.getEnvironmentId()), describer.getDatabaseName(), sqls, connectionConfig)); - Map> schemaName2SqlTypes = SchemaExtractor.listSchemaName2SqlTypes( - sqls.stream().map(e -> SqlTuple.newTuple(e.getStr())).collect(Collectors.toList()), - preCheckTaskEntity.getDatabaseName(), this.connectionConfig.getDialectType()); - Map> schemaName2PermissionTypes = new HashMap<>(); - for (Entry> entry : schemaName2SqlTypes.entrySet()) { - Set sqlTypes = entry.getValue(); - if (CollectionUtils.isNotEmpty(sqlTypes)) { - Set permissionTypes = sqlTypes.stream().map(DatabasePermissionType::from) - .filter(Objects::nonNull).collect(Collectors.toSet()); - permissionTypes.addAll(DatabasePermissionType.from(taskType)); - if (CollectionUtils.isNotEmpty(permissionTypes)) { - schemaName2PermissionTypes.put(entry.getKey(), permissionTypes); - } - } - } - unauthorizedDatabases = - databaseService.filterUnauthorizedDatabases(schemaName2PermissionTypes, connectionConfig.getId(), - true); + unauthorizedDatabases = getUnauthorizedDatabases(sqls, connectionConfig.getId(), + preCheckTaskEntity.getDatabaseName(), this.connectionConfig.getDialectType(), taskType); } this.permissionCheckResult = new DatabasePermissionCheckResult(unauthorizedDatabases); this.sqlCheckResult = SqlCheckTaskResult.success(violations); @@ -312,6 +360,67 @@ private void doSqlCheckAndDatabasePermissionCheck(TaskEntity preCheckTaskEntity, } } + private List getUnauthorizedDatabases(List sqls, + Long connectionId, String defaultSchema, DialectType dialectType, TaskType taskType) { + Map> schemaName2SqlTypes = SchemaExtractor.listSchemaName2SqlTypes( + sqls.stream().map(e -> SqlTuple.newTuple(e.getStr())).collect(Collectors.toList()), + defaultSchema, dialectType); + Map> schemaName2PermissionTypes = new HashMap<>(); + for (Entry> entry : schemaName2SqlTypes.entrySet()) { + Set sqlTypes = entry.getValue(); + if (CollectionUtils.isNotEmpty(sqlTypes)) { + Set permissionTypes = sqlTypes.stream().map(DatabasePermissionType::from) + .filter(Objects::nonNull).collect(Collectors.toSet()); + permissionTypes.addAll(DatabasePermissionType.from(taskType)); + if (CollectionUtils.isNotEmpty(permissionTypes)) { + schemaName2PermissionTypes.put(entry.getKey(), permissionTypes); + } + } + } + return databaseService.filterUnauthorizedDatabases(schemaName2PermissionTypes, connectionId, true); + } + + private void doMultipleSqlCheckAndDatabasePermissionCheck(TaskEntity preCheckTaskEntity, + Map databaseId2RiskLevelDescriber, TaskType taskType) { + List sqls = new ArrayList<>(); + this.overLimit = getSqlContentUntilOverLimit(sqls, preCheckTaskProperties.getMaxSqlContentBytes()); + if (CollectionUtils.isNotEmpty(sqls)) { + List sqlCheckTaskResultList = new ArrayList<>(); + for (Database database : this.databaseList) { + List violations = new ArrayList<>(this.sqlCheckService.check( + Long.valueOf(databaseId2RiskLevelDescriber.get(database.getId()).getEnvironmentId()), + database.getName(), sqls, database.getDataSource())); + sqlCheckTaskResultList.add(SqlCheckTaskResult.success(violations)); + List unauthorizedDatabases = getUnauthorizedDatabases(sqls, + database.getDataSource().getId(), database.getName(), + database.getDataSource().getDialectType(), taskType); + if (this.permissionCheckResult == null) { + this.permissionCheckResult = new DatabasePermissionCheckResult(unauthorizedDatabases); + } else { + this.permissionCheckResult.getUnauthorizedDatabases().addAll(unauthorizedDatabases); + } + } + this.multipleSqlCheckTaskResult = new MultipleSqlCheckTaskResult(); + this.multipleSqlCheckTaskResult.setSqlCheckTaskResultList(sqlCheckTaskResultList); + this.multipleSqlCheckTaskResult.setDatabaseList(this.databaseList.stream() + .map(DatabaseChangeDatabase::new).collect(Collectors.toList())); + this.multipleSqlCheckTaskResult.setSuccess(true); + this.multipleSqlCheckTaskResult + .setIssueCount(this.multipleSqlCheckTaskResult.getSqlCheckTaskResultList().stream() + .map(SqlCheckTaskResult::getIssueCount) + .reduce((sum, account) -> sum = sum + account).get()); + this.multipleSqlCheckTaskResult.setMaxLevel( + Math.toIntExact(approvalFlowConfigSelector.selectForMultipleDatabase().getId())); + this.multipleSqlCheckTaskResult.setError(null); + this.multipleSqlCheckTaskResult.setFileName(CHECK_RESULT_FILE_NAME); + } + try { + storeTaskResultToFile(preCheckTaskEntity.getId(), this.multipleSqlCheckTaskResult); + } catch (Exception e) { + throw new ServiceTaskError(e); + } + } + /** * Get the sql content from the databaseChangeRelatedSqls and sqlIterator, and put them into the * sqlBuffer. If the sql content is over the maxSqlBytes, return true, else return false. @@ -353,6 +462,11 @@ private void loadUserInputSqlContent(TaskType taskType, String parameter) { DatabaseChangeParameters params = JsonUtils.fromJson(parameter, DatabaseChangeParameters.class); sqlContent = params.getSqlContent(); delimiter = params.getDelimiter(); + } else if (taskType == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters params = + JsonUtils.fromJson(parameter, MultipleDatabaseChangeParameters.class); + sqlContent = params.getSqlContent(); + delimiter = params.getDelimiter(); } else if (taskType == TaskType.ONLINE_SCHEMA_CHANGE) { OnlineSchemaChangeParameters params = JsonUtils.fromJson(parameter, OnlineSchemaChangeParameters.class); sqlContent = params.getSqlContent(); @@ -370,15 +484,23 @@ private void loadUserInputSqlContent(TaskType taskType, String parameter) { delimiter = dcParams.getDelimiter(); } if (StringUtils.isNotBlank(sqlContent)) { - this.userInputSqls = SqlUtils.splitWithOffset(connectionConfig.getDialectType(), sqlContent, delimiter); + if (taskType == TaskType.MULTIPLE_ASYNC) { + this.userInputSqls = SqlUtils.splitWithOffset(databaseList.get(0).getDataSource().getDialectType(), + sqlContent, delimiter); + } else { + this.userInputSqls = SqlUtils.splitWithOffset(connectionConfig.getDialectType(), sqlContent, delimiter); + } } } private void loadUploadFileInputStream(TaskType taskType, String parametersJson) { String bucketName = "async".concat(File.separator).concat(this.creatorId.toString()); DatabaseChangeParameters params = null; + MultipleDatabaseChangeParameters multipleParams = null; if (taskType == TaskType.ASYNC) { params = JsonUtils.fromJson(parametersJson, DatabaseChangeParameters.class); + } else if (taskType == TaskType.MULTIPLE_ASYNC) { + multipleParams = JsonUtils.fromJson(parametersJson, MultipleDatabaseChangeParameters.class); } else if (taskType == TaskType.ALTER_SCHEDULE) { AlterScheduleParameters asParams = JsonUtils.fromJson(parametersJson, AlterScheduleParameters.class); if (asParams.getType() != JobType.SQL_PLAN) { @@ -394,9 +516,19 @@ private void loadUploadFileInputStream(TaskType taskType, String parametersJson) this.uploadFileInputStream, StandardCharsets.UTF_8); } } + if (Objects.nonNull(multipleParams)) { + this.uploadFileInputStream = + DatabaseChangeFileReader.readInputStreamFromSqlObjects(storageFacade, multipleParams, bucketName, + -1); + if (Objects.nonNull(this.uploadFileInputStream)) { + this.uploadFileSqlIterator = SqlUtils.iterator( + this.databaseList.get(0).getDataSource().getDialectType(), multipleParams.getDelimiter(), + this.uploadFileInputStream, StandardCharsets.UTF_8); + } + } } - private void storeTaskResultToFile(Long preCheckTaskId, SqlCheckTaskResult result) throws IOException { + private void storeTaskResultToFile(Long preCheckTaskId, Object result) throws IOException { String json = JsonUtils.toJson(result); if (json == null) { throw new IllegalStateException("Can not get json string"); @@ -420,10 +552,24 @@ private PreCheckTaskResult buildPreCheckResult() { result.setOverLimit(this.overLimit); if (Objects.nonNull(this.sqlCheckResult)) { this.sqlCheckResult.setResults(null); + result.setSqlCheckResult(this.sqlCheckResult); + } else if (Objects.nonNull(this.multipleSqlCheckTaskResult)) { + this.multipleSqlCheckTaskResult.setSqlCheckTaskResultList(null); + result.setMultipleSqlCheckTaskResult(this.multipleSqlCheckTaskResult); } - result.setSqlCheckResult(this.sqlCheckResult); result.setPermissionCheckResult(this.permissionCheckResult); return result; } + private Map buildDatabaseId2RiskLevelDescriber(List databaseList) { + return databaseList.stream().collect(Collectors.toMap(Database::getId, database -> RiskLevelDescriber.builder() + .projectName(database.getProject().getName()) + .taskType(TaskType.MULTIPLE_ASYNC.name()) + .environmentId(database.getEnvironment() == null ? null + : String.valueOf(database.getEnvironment().getId())) + .environmentName(database.getEnvironment() == null ? null : database.getEnvironment().getName()) + .databaseName(database.getName()) + .build())); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/mapper/OdcRuntimeDelegateMapper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/mapper/OdcRuntimeDelegateMapper.java index 15362427e3..a129c084c3 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/mapper/OdcRuntimeDelegateMapper.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/mapper/OdcRuntimeDelegateMapper.java @@ -24,6 +24,7 @@ import com.oceanbase.odc.service.flow.task.DataTransferRuntimeFlowableTask; import com.oceanbase.odc.service.flow.task.DatabaseChangeRuntimeFlowableTask; import com.oceanbase.odc.service.flow.task.MockDataRuntimeFlowableTask; +import com.oceanbase.odc.service.flow.task.MultipleDatabaseChangeRuntimeFlowableTask; import com.oceanbase.odc.service.flow.task.PartitionPlanRuntimeFlowableTask; import com.oceanbase.odc.service.flow.task.PreCheckRuntimeFlowableTask; import com.oceanbase.odc.service.flow.task.RollbackPlanRuntimeFlowableTask; @@ -48,6 +49,8 @@ public class OdcRuntimeDelegateMapper implements RuntimeDelegateMapper { @Override public Class> map(@NonNull TaskType taskType) { switch (taskType) { + case MULTIPLE_ASYNC: + return MultipleDatabaseChangeRuntimeFlowableTask.class; case ASYNC: return DatabaseChangeRuntimeFlowableTask.class; case MOCKDATA: diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeParameters.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeParameters.java new file mode 100644 index 0000000000..af379d4c45 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeParameters.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.task.model; + +import java.util.List; + +import com.oceanbase.odc.core.shared.constant.TaskErrorStrategy; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeProject; + +import lombok.Data; + +/** + * @author: zijia.cj + * @date: 2024/3/27 + */ +@Data +public class MultipleDatabaseChangeParameters extends DatabaseChangeParameters { + /** + * Because ids in a Project cannot be deserialized, this property is needed instead of receiving + * front-end data + */ + private Long projectId; + /** + * All databases must belong to this project + */ + private DatabaseChangeProject project; + /** + * multiple databases change execution sequence + */ + private List> orderedDatabaseIds; + private List databases; + private Integer batchId; + /** + * Error strategy in multiple databases auto execution mode + */ + private TaskErrorStrategy autoErrorStrategy; + /** + * TimeoutMillis in multiple databases manual execution mode + */ + private Long manualTimeoutMillis = 1000 * 60 * 60 * 24 * 2L;// 2d for default + + public DatabaseChangeParameters convertIntoDatabaseChangeParameters( + MultipleDatabaseChangeParameters multipleDatabaseChangeParameters) { + DatabaseChangeParameters databaseChangeParameters = new DatabaseChangeParameters(); + databaseChangeParameters.setSqlContent(multipleDatabaseChangeParameters.getSqlContent()); + databaseChangeParameters.setSqlObjectNames(multipleDatabaseChangeParameters.getSqlObjectNames()); + databaseChangeParameters.setSqlObjectIds(multipleDatabaseChangeParameters.getSqlObjectIds()); + databaseChangeParameters + .setRollbackSqlObjectNames(multipleDatabaseChangeParameters.getRollbackSqlObjectNames()); + databaseChangeParameters.setRollbackSqlContent(multipleDatabaseChangeParameters.getRollbackSqlContent()); + databaseChangeParameters.setRollbackSqlObjectIds(multipleDatabaseChangeParameters.getRollbackSqlObjectIds()); + // Error strategy for sql changes in a single database + databaseChangeParameters.setErrorStrategy(multipleDatabaseChangeParameters.getErrorStrategy().toString()); + databaseChangeParameters.setDelimiter(multipleDatabaseChangeParameters.getDelimiter()); + databaseChangeParameters.setGenerateRollbackPlan(multipleDatabaseChangeParameters.getGenerateRollbackPlan()); + databaseChangeParameters.setParentJobType(multipleDatabaseChangeParameters.getParentJobType()); + return databaseChangeParameters; + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeTaskResult.java new file mode 100644 index 0000000000..57cf6f4e5e --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleDatabaseChangeTaskResult.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.task.model; + +import java.util.List; + +import com.oceanbase.odc.core.flow.model.FlowTaskResult; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangingRecord; + +import lombok.Getter; +import lombok.Setter; + +/** + * @author: zijia.cj + * @date: 2024/4/7 + */ +@Getter +@Setter +public class MultipleDatabaseChangeTaskResult implements FlowTaskResult { + private List databaseChangingRecordList; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleSqlCheckTaskResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleSqlCheckTaskResult.java new file mode 100644 index 0000000000..069724c12a --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/model/MultipleSqlCheckTaskResult.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.flow.task.model; + +import java.io.Serializable; +import java.util.List; + +import com.oceanbase.odc.core.flow.model.FlowTaskResult; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * @author: zijia.cj + * @date: 2024/4/30 + */ +@Getter +@Setter +@ToString +public class MultipleSqlCheckTaskResult implements Serializable, FlowTaskResult { + private static final long serialVersionUID = -1410986697860096629L; + private List sqlCheckTaskResultList; + private List databaseList; + private boolean success; + private Integer issueCount; + private Integer maxLevel; + private String error; + private String fileName; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/util/DatabaseChangeFileReader.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/util/DatabaseChangeFileReader.java index d263363cb6..8b6df3e5e6 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/util/DatabaseChangeFileReader.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/util/DatabaseChangeFileReader.java @@ -34,6 +34,7 @@ import com.oceanbase.odc.service.common.util.SqlUtils; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeSqlContent; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.SizeAwareInputStream; import com.oceanbase.odc.service.objectstorage.ObjectStorageFacade; import com.oceanbase.odc.service.objectstorage.model.StorageObject; @@ -47,6 +48,20 @@ */ @Slf4j public class DatabaseChangeFileReader { + public static InputStream readInputStreamFromSqlObjects(@NotNull ObjectStorageFacade storageFacade, + MultipleDatabaseChangeParameters params, String bucketName, + long maxSizeBytes) { + List objectIds = params.getSqlObjectIds(); + if (CollectionUtils.isEmpty(objectIds)) { + return null; + } + try { + return readSqlFilesStream(storageFacade, bucketName, objectIds, maxSizeBytes).getInputStream(); + } catch (Exception e) { + log.warn("Failed to read sql files from object storage", e); + throw new IllegalStateException("Failed to read sql files from object storage"); + } + } public static InputStream readInputStreamFromSqlObjects(@NotNull ObjectStorageFacade storageFacade, DatabaseChangeParameters params, String bucketName, diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java index c9b5282211..ac03351536 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/DescriptionGenerator.java @@ -15,9 +15,15 @@ */ package com.oceanbase.odc.service.flow.util; +import java.util.List; +import java.util.stream.Collectors; + import com.oceanbase.odc.common.util.StringUtils; import com.oceanbase.odc.core.shared.constant.Symbols; +import com.oceanbase.odc.core.shared.constant.TaskType; +import com.oceanbase.odc.service.databasechange.model.DatabaseChangeDatabase; import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; /** * @Author:tinker @@ -30,8 +36,18 @@ public static void generateDescription(CreateFlowInstanceReq req) { if (StringUtils.isEmpty(req.getDescription())) { String descFormat = Symbols.LEFT_BRACKET.getLocalizedMessage() + "%s" + Symbols.RIGHT_BRACKET.getLocalizedMessage() + "%s.%s"; - req.setDescription(String.format(descFormat, - req.getEnvironmentName(), req.getConnectionName(), req.getDatabaseName())); + if (req.getTaskType() == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters parameters = (MultipleDatabaseChangeParameters) req.getParameters(); + List databases = parameters.getDatabases(); + String description = databases.stream() + .map(db -> String.format(descFormat, db.getEnvironment().getName(), + db.getDataSource().getName(), db.getName())) + .collect(Collectors.joining(",")); + req.setDescription(description); + } else { + req.setDescription(String.format(descFormat, + req.getEnvironmentName(), req.getConnectionName(), req.getDatabaseName())); + } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/FlowTaskUtil.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/FlowTaskUtil.java index 0d1e4240a6..fdc64b2a0e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/FlowTaskUtil.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/util/FlowTaskUtil.java @@ -43,6 +43,7 @@ import com.oceanbase.odc.service.flow.task.model.DBStructureComparisonParameter; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.MockProperties; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.flow.task.model.OdcMockTaskConfig; import com.oceanbase.odc.service.flow.task.model.RuntimeTaskConstants; import com.oceanbase.odc.service.flow.task.model.ShadowTableSyncTaskParameter; @@ -96,6 +97,11 @@ public static DatabaseChangeParameters getAsyncParameter(@NonNull DelegateExecut () -> new VerifyException("OdcAsyncTaskParameters is absent")); } + public static MultipleDatabaseChangeParameters getMultipleAsyncParameter(@NonNull DelegateExecution execution) { + return internalGetParameter(execution, MultipleDatabaseChangeParameters.class).orElseThrow( + () -> new VerifyException("OdcMultipleAsyncTaskParameters is absent")); + } + public static DataTransferConfig getDataTransferParameter(@NonNull DelegateExecution execution) { return internalGetParameter(execution, DataTransferConfig.class).orElseThrow( () -> new VerifyException("DataTransferConfig is absent")); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/EventBuilder.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/EventBuilder.java index 7870498bbf..42e00192e7 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/EventBuilder.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/EventBuilder.java @@ -68,6 +68,7 @@ import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.flow.task.model.DatabaseChangeParameters; +import com.oceanbase.odc.service.flow.task.model.MultipleDatabaseChangeParameters; import com.oceanbase.odc.service.iam.UserService; import com.oceanbase.odc.service.iam.auth.AuthenticationFacade; import com.oceanbase.odc.service.iam.model.User; @@ -208,6 +209,11 @@ private Event ofTask(TaskEntity task, TaskEvent status) { JsonUtils.fromJson(task.getParametersJson(), ApplyProjectParameter.class); projectId = parameter.getProject().getId(); labels.putIfNonNull(PROJECT_ID, projectId); + } else if (task.getTaskType() == TaskType.MULTIPLE_ASYNC) { + MultipleDatabaseChangeParameters parameter = + JsonUtils.fromJson(task.getParametersJson(), MultipleDatabaseChangeParameters.class); + projectId = parameter.getProjectId(); + labels.putIfNonNull(PROJECT_ID, projectId); } else { throw new UnexpectedException("task.databaseId should not be null"); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/database/model/DatabasePermissionType.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/database/model/DatabasePermissionType.java index 36cd665512..6449bd4443 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/database/model/DatabasePermissionType.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/database/model/DatabasePermissionType.java @@ -106,6 +106,7 @@ public static Set from(@NonNull TaskType taskType) { case ONLINE_SCHEMA_CHANGE: case ALTER_SCHEDULE: case STRUCTURE_COMPARISON: + case MULTIPLE_ASYNC: types.add(CHANGE); break; default: diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/regulation/approval/ApprovalFlowConfigSelector.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/regulation/approval/ApprovalFlowConfigSelector.java index ab1eb0b7e9..a57f8481db 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/regulation/approval/ApprovalFlowConfigSelector.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/regulation/approval/ApprovalFlowConfigSelector.java @@ -70,4 +70,10 @@ public RiskLevel select(RiskLevelDescriber describer) { */ return riskLevelService.findHighestRiskLevel(matched); } + + @SkipAuthorize("internal usage") + public RiskLevel selectForMultipleDatabase() { + return riskLevelService.findHighestRiskLevel(); + } + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/SqlCheckService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/SqlCheckService.java index 0de43322b2..77f310c55b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/SqlCheckService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/SqlCheckService.java @@ -42,6 +42,8 @@ import com.oceanbase.odc.core.sql.split.OffsetString; import com.oceanbase.odc.service.collaboration.environment.EnvironmentService; import com.oceanbase.odc.service.collaboration.environment.model.Environment; +import com.oceanbase.odc.service.connection.database.DatabaseService; +import com.oceanbase.odc.service.connection.database.model.Database; import com.oceanbase.odc.service.connection.model.ConnectionConfig; import com.oceanbase.odc.service.regulation.ruleset.RuleService; import com.oceanbase.odc.service.regulation.ruleset.model.QueryRuleMetadataParams; @@ -49,9 +51,12 @@ import com.oceanbase.odc.service.regulation.ruleset.model.Rule.RuleViolation; import com.oceanbase.odc.service.regulation.ruleset.model.RuleMetadata; import com.oceanbase.odc.service.regulation.ruleset.model.RuleType; +import com.oceanbase.odc.service.session.ConnectSessionService; import com.oceanbase.odc.service.session.factory.OBConsoleDataSourceFactory; import com.oceanbase.odc.service.sqlcheck.model.CheckResult; import com.oceanbase.odc.service.sqlcheck.model.CheckViolation; +import com.oceanbase.odc.service.sqlcheck.model.MultipleSqlCheckReq; +import com.oceanbase.odc.service.sqlcheck.model.MultipleSqlCheckResult; import com.oceanbase.odc.service.sqlcheck.model.SqlCheckReq; import com.oceanbase.odc.service.sqlcheck.rule.SqlCheckRules; @@ -76,6 +81,12 @@ public class SqlCheckService { @Autowired private EnvironmentService environmentService; + @Autowired + private ConnectSessionService sessionService; + + @Autowired + private DatabaseService databaseService; + public List check(@NotNull ConnectionSession session, @NotNull @Valid SqlCheckReq req) { Long ruleSetId = ConnectionSessionUtil.getRuleSetId(session); @@ -93,6 +104,34 @@ public List check(@NotNull ConnectionSession session, return SqlCheckUtil.buildCheckResults(checkViolations); } + public List multipleCheck(@NotNull @Valid MultipleSqlCheckReq req) { + List databaseIds = req.getDatabaseIds(); + List databases = databaseService.listDatabasesDetailsByIds(databaseIds); + ArrayList multipleSqlCheckResults = new ArrayList<>(); + for (int i = 0; i < databaseIds.size(); i++) { + ConnectionSession session = null; + try { + session = sessionService.create(databases.get(i).getDataSource().getId(), databaseIds.get(i)); + SqlCheckReq sqlCheckReq = new SqlCheckReq(); + sqlCheckReq.setDelimiter(req.getDelimiter()); + sqlCheckReq.setScriptContent(req.getScriptContent()); + List check = check(session, sqlCheckReq); + MultipleSqlCheckResult multipleSqlCheckResult = new MultipleSqlCheckResult(); + if (CollectionUtils.isEmpty(check)) { + return Collections.emptyList(); + } + multipleSqlCheckResult.setCheckResultList(check); + multipleSqlCheckResult.setDatabase(databases.get(i)); + multipleSqlCheckResults.add(multipleSqlCheckResult); + } finally { + if (session != null) { + session.expire(); + } + } + } + return multipleSqlCheckResults; + } + public List check(@NotNull Long environmentId, @NonNull String databaseName, @NotNull List sqls, @NotNull ConnectionConfig config) { if (CollectionUtils.isEmpty(sqls)) { diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckReq.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckReq.java index f3f0948d29..710040008c 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckReq.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckReq.java @@ -18,13 +18,14 @@ import java.util.List; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; import lombok.Data; @Data public class MultipleSqlCheckReq { - + @NotEmpty private List databaseIds; @NotBlank private String scriptContent; diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckResult.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckResult.java new file mode 100644 index 0000000000..dce1a394dc --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/sqlcheck/model/MultipleSqlCheckResult.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.oceanbase.odc.service.sqlcheck.model; + +import java.io.Serializable; +import java.util.List; + +import com.oceanbase.odc.service.connection.database.model.Database; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +/** + * @author: zijia.cj + * @date: 2024/4/29 + */ + +@Getter +@Setter +@ToString +@EqualsAndHashCode +@NoArgsConstructor +@AllArgsConstructor +public class MultipleSqlCheckResult implements Serializable { + private static final long serialVersionUID = -5963934702315211337L; + private Database database; + private List checkResultList; +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java index 04c693c3d2..55529608ba 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/task/TaskService.java @@ -78,6 +78,7 @@ public class TaskService { private HostProperties properties; private static String logFilePrefix; + private static final String MULTIPLE_ASYNC_LOG_PATH_PATTERN = "%s/multiple-async/%s/multiple-async.%s"; private static final String ASYNC_LOG_PATH_PATTERN = "%s/async/%d/%s/asynctask.%s"; private static final String MOCKDATA_LOG_PATH_PATTERN = "%s/data-mocker/%s/ob-mocker.%s"; private static final String DATATRANSFER_LOG_PATH_PATTERN = "%s/data-transfer/%s/ob-loader-dumper.%s"; @@ -110,9 +111,9 @@ public TaskEntity create(@NotNull CreateFlowInstanceReq req, int executionExpira TaskType taskType = req.getTaskType(); taskEntity.setTaskType(taskType); taskEntity.setConnectionId(req.getConnectionId()); - taskEntity.setExecutionExpirationIntervalSeconds(executionExpirationIntervalSeconds); taskEntity.setDatabaseName(req.getDatabaseName()); taskEntity.setDatabaseId(req.getDatabaseId()); + taskEntity.setExecutionExpirationIntervalSeconds(executionExpirationIntervalSeconds); taskEntity.setDescription(req.getDescription()); taskEntity.setParametersJson(JsonUtils.toJson(req.getParameters())); @@ -206,6 +207,10 @@ public File getLogFile(Long userId, String taskId, TaskType type, OdcTaskLogLeve throws NotFoundException { String filePath; switch (type) { + case MULTIPLE_ASYNC: + filePath = String.format(MULTIPLE_ASYNC_LOG_PATH_PATTERN, logFilePrefix, taskId, + logLevel.name().toLowerCase()); + break; case ASYNC: filePath = String.format(ASYNC_LOG_PATH_PATTERN, logFilePrefix, userId, taskId, logLevel.name().toLowerCase());