Skip to content

Commit

Permalink
feat(logicaldatabase): logical database change task implementation (#…
Browse files Browse the repository at this point in the history
…3324)

* execution engine implementation part1

* execution impl part2

* fix

* stash

* logical database change task framework

* stash

* implement logical database change tasks

* impl

* bugfix

* refactor

* bugfix

* pmd

* fix api

* fix ut cases

* delete unused code

* remove unnecessary import

* rename sql script

* optimize concurrency issue

* response to comments

* response to comments

* response to comments

* response to comments

* create connection session in SqlExecutionHandler

* response to comments

* response to comments

* refacotr

* refactor unit tetsts

* response to comments

* mark map as ConcurrentMap

* response to comments

* fix logic error

* fix status update
  • Loading branch information
MarkPotato777 authored Sep 11, 2024
1 parent 186c259 commit f801c5c
Show file tree
Hide file tree
Showing 53 changed files with 2,466 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand All @@ -45,7 +46,6 @@
import com.oceanbase.odc.metadb.connection.logicaldatabase.DatabaseMappingRepository;
import com.oceanbase.odc.service.collaboration.environment.EnvironmentService;
import com.oceanbase.odc.service.collaboration.environment.model.Environment;
import com.oceanbase.odc.service.connection.database.DatabaseMapper;
import com.oceanbase.odc.service.connection.database.model.DatabaseType;
import com.oceanbase.odc.service.connection.logicaldatabase.model.CreateLogicalDatabaseReq;
import com.oceanbase.odc.service.iam.ProjectPermissionValidator;
Expand All @@ -68,13 +68,14 @@ public class LogicalDatabaseServiceTest extends ServiceTestEnv {
private static final Long LOGICAL_DATABASE_ID = 2L;
private static final String LOGICAL_DATABASE_NAME = "lebie_test";
private static final String LOGICAL_DATABASE_ALIAS = "lebie_test_alias";
private static final DatabaseMapper databaseMapper = DatabaseMapper.INSTANCE;

@Autowired
private LogicalDatabaseService logicalDatabaseService;
@MockBean
private DatabaseRepository databaseRepository;
@MockBean
private LogicalTableService logicalTableService;
@MockBean
private AuthenticationFacade authenticationFacade;
@MockBean
private ProjectPermissionValidator projectPermissionValidator;
Expand Down Expand Up @@ -112,19 +113,20 @@ public void testCreate() {
Set<Long> databaseIds = new HashSet<>();
databaseIds.addAll(Arrays.asList(PHYSICAL_DATABASE_ID));
req.setPhysicalDatabaseIds(databaseIds);
Assert.assertTrue(logicalDatabaseService.create(req));
Assert.assertNotNull(logicalDatabaseService.create(req));
}

@Test
public void testDetail() {
when(databaseMappingRepository.findByLogicalDatabaseId(anyLong())).thenReturn(listDatabaseMappings(1));
when(logicalTableService.list(anyLong())).thenReturn(Collections.emptyList());
Assert.assertNotNull(logicalDatabaseService.detail(LOGICAL_DATABASE_ID));
}

@Test
public void testListPhysicalDatabaseIds() {
public void testListPhysicalDatabases() {
when(databaseMappingRepository.findByLogicalDatabaseId(anyLong())).thenReturn(listDatabaseMappings(1));
Assert.assertEquals(1, logicalDatabaseService.listPhysicalDatabaseIds(LOGICAL_DATABASE_ID).size());
Assert.assertEquals(1, logicalDatabaseService.listPhysicalDatabases(LOGICAL_DATABASE_ID).size());
}

private ConnectionEntity getConnectionEntity() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@

import java.util.List;

import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.beans.factory.annotation.Autowired;

import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.common.util.YamlUtils;
import com.oceanbase.odc.service.connection.logicaldatabase.core.model.DataNode;
import com.oceanbase.odc.service.connection.logicaldatabase.core.parser.LogicalTableExpressionParseUtils;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -44,13 +48,17 @@ public class LogicalTableServiceTest extends ServiceTestEnv {
@Autowired
private LogicalTableService logicalTableService;

@Rule
public ExpectedException thrown = ExpectedException.none();


@Test
public void testResolve_ValidExpression() {
List<LogicalTableExpressionResolveTestCase> testCases =
YamlUtils.fromYamlList(TEST_RESOURCE_VALID_EXPRESSION_FILE_PATH,
LogicalTableExpressionResolveTestCase.class);
for (LogicalTableExpressionResolveTestCase testCase : testCases) {
List<DataNode> actual = logicalTableService.resolve(testCase.getExpression());
List<DataNode> actual = LogicalTableExpressionParseUtils.resolve(testCase.getExpression());
List<DataNode> expected = testCase.getDataNodes();
Assert.assertEquals(String.format("test case id = %d", testCase.getId()), expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
Expand All @@ -72,16 +80,8 @@ public void testResolve_InvalidExpression() {
YamlUtils.fromYamlList(TEST_RESOURCE_INVALID_EXPRESSION_FILE_PATH,
LogicalTableExpressionResolveTestCase.class);
for (LogicalTableExpressionResolveTestCase testCase : testCases) {
try {
logicalTableService.resolve(testCase.getExpression());
} catch (Exception ex) {
Assert.assertTrue(
String.format("test case id = %d, expected = %s, actual = %s", testCase.getId(),
testCase.getErrorMessageAbstract(), ex.getMessage()),
StringUtils.containsIgnoreCase(ex.getMessage(), testCase.getErrorMessageAbstract()));
continue;
}
Assert.fail(String.format("test case id = %d, exception expected but not thrown", testCase.getId()));
thrown.expectMessage(new ContainsIgnoreCase(testCase.getErrorMessageAbstract()));
LogicalTableExpressionParseUtils.resolve(testCase.getExpression());
}
}

Expand All @@ -94,4 +94,26 @@ static class LogicalTableExpressionResolveTestCase {
private List<DataNode> dataNodes;
private String errorMessageAbstract;
}

class ContainsIgnoreCase extends BaseMatcher<String> {
private final String substring;

public ContainsIgnoreCase(String substring) {
this.substring = substring.toLowerCase();
}

@Override
public boolean matches(Object item) {
if (item == null) {
return false;
}
String str = item.toString().toLowerCase();
return str.contains(substring);
}

@Override
public void describeTo(Description description) {
description.appendValue(substring);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS `logicaldatabase_database_change_execution_unit` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`execution_id` varchar(64) NOT NULL COMMENT 'execution id of the logical database change task, uuid',
`execution_order` bigint(20) NOT NULL COMMENT 'execution order',
`schedule_task_id` bigint(20) NOT NULL COMMENT 'ID of the related schedule task, refer to schedule_task.id',
`logical_database_id` bigint(20) NOT NULL COMMENT 'logical database id, reference to connect_database.id',
`physical_database_id` bigint(20) NOT NULL COMMENT 'physical database id, reference to connect_database.id',
`sql_content` mediumtext COMMENT 'sql content',
`execution_result_json` mediumtext NOT NULL COMMENT 'execution result json, see SqlExecutionResultWrapper',
`status` varchar(32) NOT NULL COMMENT 'status of the execution, see ExecutionStatus',
CONSTRAINT `pk_logical_db_change_id` PRIMARY KEY (`id`),
CONSTRAINT `uk_logical_db_change_execution_id` UNIQUE KEY (`execution_id`),
CONSTRAINT `uk_logical_db_change_sti_pdi_order` UNIQUE KEY (`schedule_task_id`, `physical_database_id`, `execution_order`)
) COMMENT = 'logical database change task execution units';

alter table schedule_schedule modify column connection_id bigint(20) COMMENT 'reference to connect_connection.id';
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.server.web.controller.v2;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.oceanbase.odc.service.common.response.Responses;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.connection.logicaldatabase.LogicalDatabaseChangeService;
import com.oceanbase.odc.service.connection.logicaldatabase.model.SqlExecutionUnitResp;
import com.oceanbase.odc.service.task.exception.JobException;

/**
* @Author: Lebie
* @Date: 2024/9/4 12:20
* @Description: []
*/
@RestController
@RequestMapping("/api/v2/logicaldatabase")
public class LogicalDatabaseChangeController {
@Autowired
private LogicalDatabaseChangeService logicalDatabaseChangeService;

@RequestMapping(value = "/scheduleTasks/{scheduleTaskId:[\\d]+}/physicalDatabases/{physicalDatabaseId:[\\d]+}",
method = RequestMethod.GET)
public SuccessResponse<SqlExecutionUnitResp> detailPhysicalDatabaseChangeTask(@PathVariable Long scheduleTaskId,
@PathVariable Long physicalDatabaseId) {
return Responses.success(logicalDatabaseChangeService.detail(scheduleTaskId, physicalDatabaseId));
}

@RequestMapping(
value = "/scheduleTasks/{scheduleTaskId:[\\d]+}/physicalDatabases/{physicalDatabaseId:[\\d]+}/skipCurrentStatement",
method = RequestMethod.POST)
public SuccessResponse<Boolean> skipCurrentStatement(@PathVariable Long scheduleTaskId,
@PathVariable Long physicalDatabaseId) throws InterruptedException, JobException {
return Responses.success(logicalDatabaseChangeService.skipCurrent(scheduleTaskId, physicalDatabaseId));
}

@RequestMapping(
value = "/scheduleTasks/{scheduleTaskId:[\\d]+}/physicalDatabases/{physicalDatabaseId:[\\d]+}/terminateCurrentStatement",
method = RequestMethod.POST)
public SuccessResponse<Boolean> terminateCurrentStatement(@PathVariable Long scheduleTaskId,
@PathVariable Long physicalDatabaseId) throws InterruptedException, JobException {
return Responses.success(logicalDatabaseChangeService.terminateCurrent(scheduleTaskId, physicalDatabaseId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import com.oceanbase.odc.service.common.response.ListResponse;
import com.oceanbase.odc.service.common.response.Responses;
import com.oceanbase.odc.service.common.response.SuccessResponse;
import com.oceanbase.odc.service.connection.database.model.Database;
import com.oceanbase.odc.service.connection.logicaldatabase.LogicalDatabaseService;
import com.oceanbase.odc.service.connection.logicaldatabase.LogicalTableService;
import com.oceanbase.odc.service.connection.logicaldatabase.model.CreateLogicalDatabaseReq;
import com.oceanbase.odc.service.connection.logicaldatabase.model.DetailLogicalDatabaseResp;
import com.oceanbase.odc.service.connection.logicaldatabase.model.DetailLogicalTableResp;
import com.oceanbase.odc.service.connection.logicaldatabase.model.LogicalTableTopologyResp;
import com.oceanbase.odc.service.connection.logicaldatabase.model.PreviewSqlReq;
import com.oceanbase.odc.service.connection.logicaldatabase.model.PreviewSqlResp;

/**
* @Author: Lebie
Expand All @@ -48,7 +51,7 @@ public class LogicalDatabaseController {
private LogicalTableService tableService;

@RequestMapping(value = "/logicalDatabases", method = RequestMethod.POST)
public SuccessResponse<Boolean> create(@RequestBody CreateLogicalDatabaseReq req) {
public SuccessResponse<Database> create(@RequestBody CreateLogicalDatabaseReq req) {
return Responses.success(databaseService.create(req));
}

Expand Down Expand Up @@ -105,4 +108,10 @@ public SuccessResponse<Boolean> checkLogicalTable(@PathVariable Long logicalData
@PathVariable Long logicalTableId) {
return Responses.success(tableService.checkStructureConsistency(logicalDatabaseId, logicalTableId));
}

@RequestMapping(value = "/logicaldatabases/{logicalDatabaseId:[\\d]+}/previewSqls", method = RequestMethod.POST)
public ListResponse<PreviewSqlResp> previewSqls(@PathVariable Long logicalDatabaseId,
@RequestBody PreviewSqlReq req) {
return Responses.list(databaseService.preview(logicalDatabaseId, req));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.metadb.connection.logicaldatabase;

import java.util.Date;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

import org.hibernate.annotations.Generated;
import org.hibernate.annotations.GenerationTime;

import com.oceanbase.odc.service.connection.logicaldatabase.core.executor.execution.ExecutionStatus;

import lombok.Data;

/**
* @Author: Lebie
* @Date: 2024/9/3 19:56
* @Description: []
*/
@Data
@Entity
@Table(name = "logicaldatabase_database_change_execution_unit")
public class LogicalDBChangeExecutionUnitEntity {
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "execution_id", nullable = false, updatable = false)
private String executionId;

@Column(name = "execution_order", nullable = false, updatable = false)
private Long executionOrder;

@Column(name = "schedule_task_id", nullable = false)
private Long scheduleTaskId;

@Column(name = "logical_database_id", nullable = false, updatable = false)
private Long logicalDatabaseId;

@Column(name = "physical_database_id", nullable = false, updatable = false)
private Long physicalDatabaseId;

@Column(name = "sql_content", nullable = false, updatable = false)
private String sql;

@Column(name = "execution_result_json", nullable = false)
private String executionResultJson;

@Column(name = "status", nullable = false)
@Enumerated(EnumType.STRING)
private ExecutionStatus status;

@Generated(GenerationTime.ALWAYS)
@Column(name = "create_time", insertable = false, updatable = false)
private Date createTime;

@Generated(GenerationTime.ALWAYS)
@Column(name = "update_time", insertable = false, updatable = false)
private Date updateTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.metadb.connection.logicaldatabase;

import java.util.List;
import java.util.Optional;

import com.oceanbase.odc.config.jpa.OdcJpaRepository;

public interface LogicalDBExecutionRepository extends OdcJpaRepository<LogicalDBChangeExecutionUnitEntity, Long> {
Optional<LogicalDBChangeExecutionUnitEntity> findByExecutionId(String executionId);

List<LogicalDBChangeExecutionUnitEntity> findByScheduleTaskIdOrderByExecutionOrderAsc(Long scheduleTaskId);

List<LogicalDBChangeExecutionUnitEntity> findByScheduleTaskIdAndPhysicalDatabaseIdOrderByExecutionOrderAsc(
Long scheduleTaskId, Long physicalDatabaseId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class ScheduleEntity implements Serializable {
private String name;
@Column(name = "organization_id", nullable = false)
private Long organizationId;
@Column(name = "connection_id", nullable = false)
@Column(name = "connection_id")
private Long dataSourceId;
@Column(name = "database_name", nullable = false)
private String databaseName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,10 @@ public List<Long> innerListIdByOrganizationIdAndTenantId(@NonNull Long organizat
}

@SkipAuthorize("odc internal usages")
public List<ConnectionConfig> innerListByIds(@NotEmpty Collection<Long> ids) {
public List<ConnectionConfig> innerListByIds(Collection<Long> ids) {
if (CollectionUtils.isEmpty(ids)) {
return Collections.emptyList();
}
return repository.findByIdIn(ids).stream().map(mapper::entityToModel).collect(Collectors.toList());
}

Expand Down
Loading

0 comments on commit f801c5c

Please sign in to comment.