Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dlm): table structure synchronization failed #2682

Merged
merged 12 commits into from
Jun 6, 2024
4 changes: 4 additions & 0 deletions server/odc-server/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,10 @@
<AppenderRef ref="ScheduleTaskInfoRoutingAppender"/>
<AppenderRef ref="ScheduleTaskWarnRoutingAppender"/>
</Logger>
<Logger name="com.oceanbase.odc.service.schedule.job" level="INFO" additivity="true">
<AppenderRef ref="ScheduleTaskInfoRoutingAppender"/>
<AppenderRef ref="ScheduleTaskWarnRoutingAppender"/>
</Logger>

<Logger name="com.oceanbase.odc.service.quartz" level="INFO" additivity="true">
<AppenderRef ref="SchedulerFileAppender"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public void destroy() {
}

public List<DlmTableUnit> getDlmTableUnits(Long scheduleTaskId) throws SQLException {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement("select * from dlm_table_unit where schedule_task_id = ?");
try (Connection conn = dataSource.getConnection();
PreparedStatement ps =
conn.prepareStatement("select * from dlm_table_unit where schedule_task_id = ?")) {
ps.setLong(1, scheduleTaskId);
ResultSet resultSet = ps.executeQuery();
List<DlmTableUnit> dlmTableUnits = new LinkedList<>();
Expand Down Expand Up @@ -108,7 +109,7 @@ public void updateDlmTableUnitStatus(String dlmTableUnitId, TaskStatus status) t
"end_time = CASE WHEN ? IN ('CANCELED', 'DONE', 'FAILED') THEN CURRENT_TIMESTAMP ELSE end_time END " +
"WHERE dlm_table_unit_id = ?";

try (PreparedStatement pstmt = dataSource.getConnection().prepareStatement(sql)) {
try (Connection conn = dataSource.getConnection(); PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, status.name());
pstmt.setString(2, status.name());
pstmt.setString(3, status.name());
Expand Down Expand Up @@ -152,9 +153,9 @@ public void storeDlmTableUnit(List<DlmTableUnit> dlmTableUnits) throws SQLExcept
@Override
public TaskGenerator getTaskGenerator(String generatorId, String jobId) throws SQLException {
if (enableBreakpointRecovery) {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_task_generator where job_id = ?");
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_task_generator where job_id = ?")) {
ps.setString(1, jobId);
ResultSet resultSet = ps.executeQuery();
if (resultSet.next()) {
Expand Down Expand Up @@ -191,8 +192,8 @@ public void storeTaskGenerator(TaskGenerator taskGenerator) throws SQLException
sb.append(
"processed_row_count=values(processed_row_count),processed_data_size=values(processed_data_size),primary_key_save_point=values(primary_key_save_point)");
log.info("start to store task generator:{}", taskGenerator);
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(sb.toString());
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sb.toString())) {
ps.setString(1, taskGenerator.getId());
ps.setString(2, taskGenerator.getJobId());
ps.setLong(3, taskGenerator.getProcessedDataSize());
Expand Down Expand Up @@ -247,9 +248,9 @@ public void storeJobStatistic(JobMeta jobMeta) throws JobSqlException {
@Override
public List<TaskMeta> getTaskMeta(JobMeta jobMeta) throws SQLException {
if (enableBreakpointRecovery) {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_task_unit where generator_id = ? AND status !='SUCCESS'");
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_task_unit where generator_id = ? AND status !='SUCCESS'")) {
ps.setString(1, jobMeta.getGenerator().getId());
ResultSet resultSet = ps.executeQuery();
List<TaskMeta> taskMetas = new LinkedList<>();
Expand Down Expand Up @@ -286,8 +287,8 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {
"status=values(status),partition_name=values(partition_name),lower_bound_primary_key=values(lower_bound_primary_key),");
sb.append(
"upper_bound_primary_key=values(upper_bound_primary_key),primary_key_cursor=values(primary_key_cursor)");
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(sb.toString());
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sb.toString())) {
ps.setLong(1, taskMeta.getTaskIndex());
ps.setString(2, taskMeta.getJobMeta().getJobId());
ps.setString(3, taskMeta.getGeneratorId());
Expand All @@ -308,13 +309,13 @@ public void storeTaskMeta(TaskMeta taskMeta) throws SQLException {

@Override
public Long getAbnormalTaskIndex(String jobId) {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(
"select count(1) from dlm_task_unit where job_id=? and (status != 'SUCCESS' or primary_key_cursor is null)");
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"select count(1) from dlm_task_unit where job_id=? and (status != 'SUCCESS' or primary_key_cursor is null)")) {
ps.setString(1, jobId);
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
Long count = resultSet.getLong(1);
if (resultSet.next()) {
long count = resultSet.getLong(1);
return count > 0 ? count : null;
}
} catch (Exception ignored) {
Expand All @@ -330,9 +331,9 @@ public void updateTableSizeInfo(TableSizeInfo tableSizeInfo, long l) {

@Override
public void updateLimiter(JobMeta jobMeta) {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_config_limiter_configuration where order_id = ?");
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(
"select * from dlm_config_limiter_configuration where order_id = ?")) {
ps.setString(1, DlmJobIdUtil.getJobName(jobMeta.getJobId()));
ResultSet resultSet = ps.executeQuery();
RateLimitConfiguration rateLimit;
Expand Down Expand Up @@ -381,10 +382,10 @@ private void setTableLimitConfig(TableMeta tableMeta, int rowLimit) {
}

private void initEnableBreakpointRecovery() {
try (Connection conn = dataSource.getConnection()) {
PreparedStatement preparedStatement = conn.prepareStatement(
"select value from config_system_configuration where `key` = 'odc.task.dlm"
+ ".support-breakpoint-recovery'");
try (Connection conn = dataSource.getConnection();
PreparedStatement preparedStatement = conn.prepareStatement(
"select value from config_system_configuration where `key` = 'odc.task.dlm"
+ ".support-breakpoint-recovery'")) {
ResultSet resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
this.enableBreakpointRecovery = resultSet.getBoolean(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -106,4 +107,16 @@ public List<DlmTableUnit> findByScheduleTaskId(Long scheduleTaskId) {
Collectors.toList());
}

public TaskStatus getTaskStatus(Long scheduleTaskId) {
Set<TaskStatus> collect = findByScheduleTaskId(scheduleTaskId).stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
}
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
}
return TaskStatus.CANCELED;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.LinkedList;
import java.util.List;

import com.oceanbase.odc.common.util.StringUtils;

import lombok.Data;

/**
Expand All @@ -37,4 +39,8 @@ public class DataArchiveTableConfig {

// the sql condition such as "gmt_create < '2023-01-01'"
private String conditionExpression;

public String getTargetTableName() {
return StringUtils.isEmpty(targetTableName) ? tableName : targetTableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.springframework.beans.factory.annotation.Autowired;

import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.common.util.VersionUtils;
import com.oceanbase.odc.core.session.ConnectionSession;
import com.oceanbase.odc.core.session.ConnectionSessionConstants;
Expand Down Expand Up @@ -188,11 +187,5 @@ private void initDefaultConfig(DataArchiveParameters parameters) {
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
parameters.setShardingStrategy(dlmConfiguration.getShardingStrategy());
// set default target table name.
parameters.getTables().forEach(tableConfig -> {
if (StringUtils.isEmpty(tableConfig.getTargetTableName())) {
tableConfig.setTargetTableName(tableConfig.getTableName());
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,6 @@ private void initDefaultConfig(DataDeleteParameters parameters) {
.setWriteThreadCount(dlmConfiguration.getSingleTaskThreadPoolSize() - parameters.getReadThreadCount());
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
// if no need check before delete, or target table name is null, set default target table name
parameters.getTables().forEach(tableConfig -> {
if (!parameters.getNeedCheckBeforeDelete() || Objects.isNull(tableConfig.getTargetTableName())) {
tableConfig.setTargetTableName(tableConfig.getTableName());
}
});

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.quartz.JobExecutionContext;

Expand Down Expand Up @@ -109,7 +107,6 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits, Long time
log.info("Task is ready,taskId={}", taskId);
for (DlmTableUnit dlmTableUnit : dlmTableUnits) {
if (isInterrupted) {
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
log.info("Task interrupted and will exit.TaskId={}", taskId);
continue;
}
Expand All @@ -118,17 +115,19 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits, Long time
dlmTableUnit.getTableName());
continue;
}
try {
if (dlmTableUnit.getType() == JobType.MIGRATE) {
try {
DLMTableStructureSynchronizer.sync(dlmTableUnit.getSourceDatasourceInfo(),
dlmTableUnit.getTargetDatasourceInfo(), dlmTableUnit.getTableName(),
dlmTableUnit.getTargetTableName(),
dlmTableUnit.getParameters().getSyncDBObjectType());
} catch (SQLException e) {
log.warn("Sync table structure failed,tableName={}", dlmTableUnit.getTableName(), e);
}
if (dlmTableUnit.getType() == JobType.MIGRATE) {
try {
DLMTableStructureSynchronizer.sync(dlmTableUnit.getSourceDatasourceInfo(),
dlmTableUnit.getTargetDatasourceInfo(), dlmTableUnit.getTableName(),
dlmTableUnit.getTargetTableName(),
dlmTableUnit.getParameters().getSyncDBObjectType());
} catch (SQLException e) {
log.warn("Sync table structure failed,tableName={}", dlmTableUnit.getTableName(), e);
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.FAILED);
continue;
}
}
try {
job = jobFactory.createJob(dlmTableUnit);
log.info("Create dlm job succeed,taskId={},task parameters={}", taskId, dlmTableUnit);
} catch (Exception e) {
Expand All @@ -144,7 +143,7 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits, Long time
log.info("DLM job succeed,taskId={},unitId={}", taskId, dlmTableUnit.getDlmTableUnitId());
} catch (JobException e) {
// used to stop several sub-threads.
if (job.getJobMeta().isToStop()) {
if (isInterrupted) {
log.info("Data archive task is Interrupted,taskId={}", taskId);
dlmService.updateStatusByDlmTableUnitId(dlmTableUnit.getDlmTableUnitId(), TaskStatus.CANCELED);
} else {
Expand All @@ -156,19 +155,7 @@ public void executeTask(Long taskId, List<DlmTableUnit> dlmTableUnits, Long time
}

public TaskStatus getTaskStatus(Long scheduleTaskId) {
Set<TaskStatus> collect =
dlmService.findByScheduleTaskId(scheduleTaskId).stream().map(DlmTableUnit::getStatus).collect(
Collectors.toSet());
if (collect.contains(TaskStatus.DONE) && collect.size() == 1) {
return TaskStatus.DONE;
}
if (isInterrupted) {
return TaskStatus.CANCELED;
}
if (collect.contains(TaskStatus.FAILED)) {
return TaskStatus.FAILED;
}
return TaskStatus.CANCELED;
return dlmService.getTaskStatus(scheduleTaskId);
}

public List<DlmTableUnit> getTaskUnits(ScheduleTaskEntity taskEntity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.service.dlm.DataSourceInfoMapper;
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
import com.oceanbase.odc.service.dlm.model.DataArchiveTableConfig;
import com.oceanbase.odc.service.dlm.model.DlmTableUnit;
Expand Down Expand Up @@ -91,16 +90,10 @@ private void executeInTaskFramework(JobExecutionContext context) {
parameters.setReadThreadCount(dataArchiveParameters.getReadThreadCount());
parameters.setShardingStrategy(dataArchiveParameters.getShardingStrategy());
parameters.setScanBatchSize(dataArchiveParameters.getScanBatchSize());
parameters
.setSourceDs(DataSourceInfoMapper.toDataSourceInfo(
databaseService.findDataSourceForConnectById(dataArchiveParameters.getSourceDatabaseId())));
parameters
.setTargetDs(DataSourceInfoMapper.toDataSourceInfo(
databaseService.findDataSourceForConnectById(dataArchiveParameters.getTargetDataBaseId())));
parameters.setSourceDs(getDataSourceInfo(dataArchiveParameters.getSourceDatabaseId()));
parameters.setTargetDs(getDataSourceInfo(dataArchiveParameters.getTargetDataBaseId()));
parameters.getSourceDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
parameters.getTargetDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
parameters.getSourceDs().setDatabaseName(dataArchiveParameters.getSourceDatabaseName());
parameters.getTargetDs().setDatabaseName(dataArchiveParameters.getTargetDatabaseName());
parameters.setSyncTableStructure(dataArchiveParameters.getSyncTableStructure());

Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.core.shared.constant.TaskStatus;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.service.dlm.DataSourceInfoMapper;
import com.oceanbase.odc.service.dlm.model.DataArchiveTableConfig;
import com.oceanbase.odc.service.dlm.model.DataDeleteParameters;
import com.oceanbase.odc.service.dlm.model.DlmTableUnit;
import com.oceanbase.odc.service.dlm.model.DlmTableUnitParameters;
import com.oceanbase.odc.service.dlm.model.RateLimitConfiguration;
import com.oceanbase.odc.service.dlm.utils.DataArchiveConditionUtil;
import com.oceanbase.odc.service.dlm.utils.DlmJobIdUtil;
import com.oceanbase.odc.service.schedule.model.DlmTableUnitStatistic;
import com.oceanbase.tools.migrator.common.enums.JobType;
import com.oceanbase.tools.migrator.task.CheckMode;

Expand Down Expand Up @@ -105,6 +105,7 @@ public List<DlmTableUnit> splitTask(ScheduleTaskEntity taskEntity) {
parameter.setMigratePartitions(table.getPartitions());
dlmTableUnit.setParameters(parameter);
dlmTableUnit.setStatus(TaskStatus.PREPARING);
dlmTableUnit.setStatistic(new DlmTableUnitStatistic());
JobType jobType = parameters.getNeedCheckBeforeDelete() ? JobType.DELETE : JobType.QUICK_DELETE;
dlmTableUnit.setType(parameters.getDeleteByUniqueKey() ? jobType : JobType.DEIRECT_DELETE);
dlmTasks.add(dlmTableUnit);
Expand Down Expand Up @@ -136,18 +137,12 @@ private void executeInTaskFramework(JobExecutionContext context) {
parameters.setWriteThreadCount(dataDeleteParameters.getWriteThreadCount());
parameters.setReadThreadCount(dataDeleteParameters.getReadThreadCount());
parameters.setScanBatchSize(dataDeleteParameters.getScanBatchSize());
parameters
.setSourceDs(DataSourceInfoMapper.toDataSourceInfo(
databaseService.findDataSourceForConnectById(dataDeleteParameters.getDatabaseId())));
parameters
.setTargetDs(DataSourceInfoMapper.toDataSourceInfo(
databaseService.findDataSourceForConnectById(dataDeleteParameters.getTargetDatabaseId() == null
? dataDeleteParameters.getDatabaseId()
: dataDeleteParameters.getTargetDatabaseId())));
parameters.setSourceDs(getDataSourceInfo(dataDeleteParameters.getDatabaseId()));
parameters.setTargetDs(getDataSourceInfo(dataDeleteParameters.getTargetDatabaseId() == null
? dataDeleteParameters.getDatabaseId()
: dataDeleteParameters.getTargetDatabaseId()));
parameters.getSourceDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
parameters.getTargetDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
parameters.getSourceDs().setDatabaseName(dataDeleteParameters.getDatabaseName());
parameters.getTargetDs().setDatabaseName(dataDeleteParameters.getTargetDatabaseName());

Long jobId = publishJob(parameters, dataDeleteParameters.getTimeoutMillis());
scheduleTaskRepository.updateJobIdById(taskEntity.getId(), jobId);
Expand Down
Loading