Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dlm):data delete failure in periodic task and remove sys tenant verification #857

Merged
merged 11 commits into from
Nov 22, 2023
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
<mina.version>2.1.6</mina.version>

<!-- data-lifecycle-manager version -->
<data-lifecycle-manager.version>1.0.4</data-lifecycle-manager.version>
<data-lifecycle-manager.version>1.0.6</data-lifecycle-manager.version>

<!-- plugin version -->
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import com.oceanbase.odc.service.schedule.model.ScheduleTaskMapper;
import com.oceanbase.odc.service.schedule.model.ScheduleTaskResp;
import com.oceanbase.odc.service.schedule.model.TriggerConfig;
import com.oceanbase.odc.service.schedule.model.TriggerStrategy;
import com.oceanbase.odc.service.task.model.ExecutorInfo;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -302,18 +303,21 @@ public ScheduleTaskResp dataArchiveDelete(Long scheduleId, Long taskId) {
throw new IllegalStateException("Delete is not allowed because the data archive job has not succeeded.");
}

// TODO throw
try {
if (!quartzJobService.checkExists(jobKey)) {
CreateQuartzJobReq req = new CreateQuartzJobReq();
req.setScheduleId(scheduleId);
req.setType(JobType.DATA_ARCHIVE_DELETE);
DataArchiveClearParameters parameters = new DataArchiveClearParameters();
parameters.setDataArchiveTaskId(taskId);
req.getJobDataMap().putAll(BeanMap.create(parameters));
quartzJobService.createJob(req);
if (quartzJobService.checkExists(jobKey)) {
log.info("Data archive delete job exists and start delete job,jobKey={}", jobKey);
quartzJobService.deleteJob(jobKey);
}
quartzJobService.triggerJob(jobKey);
CreateQuartzJobReq req = new CreateQuartzJobReq();
req.setScheduleId(scheduleId);
req.setType(JobType.DATA_ARCHIVE_DELETE);
DataArchiveClearParameters parameters = new DataArchiveClearParameters();
parameters.setDataArchiveTaskId(taskId);
TriggerConfig triggerConfig = new TriggerConfig();
triggerConfig.setTriggerStrategy(TriggerStrategy.START_NOW);
req.getJobDataMap().putAll(BeanMap.create(parameters));
req.setTriggerConfig(triggerConfig);
quartzJobService.createJob(req);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
Expand All @@ -337,16 +341,20 @@ public ScheduleTaskResp rollbackTask(Long scheduleId, Long taskId) {
}

try {
if (!quartzJobService.checkExists(jobKey)) {
CreateQuartzJobReq req = new CreateQuartzJobReq();
req.setScheduleId(scheduleId);
req.setType(JobType.DATA_ARCHIVE_ROLLBACK);
DataArchiveRollbackParameters parameters = new DataArchiveRollbackParameters();
parameters.setDataArchiveTaskId(taskId);
req.getJobDataMap().putAll(BeanMap.create(parameters));
quartzJobService.createJob(req);
if (quartzJobService.checkExists(jobKey)) {
log.info("Data archive rollback job exists and start delete job,jobKey={}", jobKey);
quartzJobService.deleteJob(jobKey);
}
quartzJobService.triggerJob(jobKey);
CreateQuartzJobReq req = new CreateQuartzJobReq();
req.setScheduleId(scheduleId);
req.setType(JobType.DATA_ARCHIVE_ROLLBACK);
DataArchiveRollbackParameters parameters = new DataArchiveRollbackParameters();
parameters.setDataArchiveTaskId(taskId);
req.getJobDataMap().putAll(BeanMap.create(parameters));
TriggerConfig triggerConfig = new TriggerConfig();
triggerConfig.setTriggerStrategy(TriggerStrategy.START_NOW);
req.setTriggerConfig(triggerConfig);
quartzJobService.createJob(req);
} catch (SchedulerException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.oceanbase.odc.service.flow.processor.ScheduleTaskPreprocessor;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.plugin.ConnectionPluginUtil;
import com.oceanbase.odc.service.schedule.DlmEnvironment;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory;
Expand All @@ -49,10 +48,6 @@
@Slf4j
@ScheduleTaskPreprocessor(type = JobType.DATA_ARCHIVE)
public class DataArchivePreprocessor extends AbstractDlmJobPreprocessor {

@Autowired
private DlmEnvironment dlmEnvironment;

@Autowired
private AuthenticationFacade authenticationFacade;

Expand All @@ -75,10 +70,6 @@ public void process(CreateFlowInstanceReq req) {
// permission to access it.
Database sourceDb = databaseService.detail(dataArchiveParameters.getSourceDatabaseId());
Database targetDb = databaseService.detail(dataArchiveParameters.getTargetDataBaseId());
if (dlmEnvironment.isSysTenantUserRequired()) {
checkDatasource(sourceDb.getDataSource());
checkDatasource(targetDb.getDataSource());
}
dataArchiveParameters.setSourceDatabaseName(sourceDb.getName());
dataArchiveParameters.setTargetDatabaseName(targetDb.getName());
dataArchiveParameters.setSourceDataSourceName(sourceDb.getDataSource().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.oceanbase.odc.service.flow.model.CreateFlowInstanceReq;
import com.oceanbase.odc.service.flow.processor.ScheduleTaskPreprocessor;
import com.oceanbase.odc.service.iam.auth.AuthenticationFacade;
import com.oceanbase.odc.service.schedule.DlmEnvironment;
import com.oceanbase.odc.service.schedule.ScheduleService;
import com.oceanbase.odc.service.schedule.model.JobType;
import com.oceanbase.odc.service.session.factory.DefaultConnectSessionFactory;
Expand All @@ -45,9 +44,6 @@
@ScheduleTaskPreprocessor(type = JobType.DATA_DELETE)
public class DataDeletePreprocessor extends AbstractDlmJobPreprocessor {

@Autowired
private DlmEnvironment dlmEnvironment;

@Autowired
private AuthenticationFacade authenticationFacade;

Expand All @@ -69,9 +65,6 @@ public void process(CreateFlowInstanceReq req) {
// Throw exception when the specified database does not exist or the current user does not have
// permission to access it.
Database sourceDb = databaseService.detail(dataDeleteParameters.getDatabaseId());
if (dlmEnvironment.isSysTenantUserRequired()) {
checkDatasource(sourceDb.getDataSource());
}

ConnectionConfig dataSource = sourceDb.getDataSource();
dataSource.setDefaultSchema(sourceDb.getName());
Expand All @@ -98,7 +91,7 @@ public void process(CreateFlowInstanceReq req) {
if (dataDeleteParameters.getRateLimit().getDataSizeLimit() != null) {
limiterConfig.setDataSizeLimit(dataDeleteParameters.getRateLimit().getDataSizeLimit());
}
if (dataDeleteParameters.getRateLimit().getRowLimit() != null) {
if (dataDeleteParameters.getRateLimit().getBatchSize() != null) {
limiterConfig.setBatchSize(dataDeleteParameters.getRateLimit().getBatchSize());
}
limiterService.createAndBindToOrder(scheduleEntity.getId(), limiterConfig);
Expand Down

This file was deleted.

This file was deleted.