Skip to content

Commit

Permalink
feat(dlm): support configure sharding strategy (#3275)
Browse files Browse the repository at this point in the history
* support configure sharding strategy.

* upgrade version to 1.1.4.bp2
  • Loading branch information
guowl3 authored Sep 5, 2024
1 parent 97dd6dd commit aacfb18
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
<mina.version>2.1.6</mina.version>

<!-- data-lifecycle-manager version -->
<data-lifecycle-manager.version>1.1.4.bp1</data-lifecycle-manager.version>
<data-lifecycle-manager.version>1.1.4.bp2</data-lifecycle-manager.version>

<!-- plugin version -->
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import com.oceanbase.odc.core.flow.model.TaskParameters;
import com.oceanbase.tools.migrator.common.enums.ShardingStrategy;

import lombok.Data;

Expand Down Expand Up @@ -63,4 +64,6 @@ public class DataDeleteParameters implements TaskParameters {

private Long timeoutMillis;

private ShardingStrategy shardingStrategy;

}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ private void initDefaultConfig(DataArchiveParameters parameters) {
.setWriteThreadCount(dlmConfiguration.getSingleTaskThreadPoolSize() - parameters.getReadThreadCount());
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
parameters.setShardingStrategy(dlmConfiguration.getShardingStrategy());
if (parameters.getShardingStrategy() == null) {
parameters.setShardingStrategy(dlmConfiguration.getShardingStrategy());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ private void initDefaultConfig(DataDeleteParameters parameters) {
.setWriteThreadCount(dlmConfiguration.getSingleTaskThreadPoolSize() - parameters.getReadThreadCount());
parameters.setScanBatchSize(dlmConfiguration.getDefaultScanBatchSize());
parameters.setQueryTimeout(dlmConfiguration.getTaskConnectionQueryTimeout());
if (parameters.getShardingStrategy() == null) {
parameters.setShardingStrategy(dlmConfiguration.getShardingStrategy());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public List<DlmTableUnit> splitTask(ScheduleTaskEntity taskEntity) {
parameter.setReaderBatchSize(limiterConfig.getBatchSize());
parameter.setWriterBatchSize(limiterConfig.getBatchSize());
parameter.setMigratePartitions(table.getPartitions());
parameter.setShardingStrategy(parameters.getShardingStrategy());
dlmTableUnit.setParameters(parameter);
dlmTableUnit.setStatus(TaskStatus.PREPARING);
dlmTableUnit.setStatistic(new DlmTableUnitStatistic());
Expand Down Expand Up @@ -144,6 +145,7 @@ private void executeInTaskFramework(JobExecutionContext context) {
: dataDeleteParameters.getTargetDatabaseId()));
parameters.getSourceDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
parameters.getTargetDs().setQueryTimeout(dataDeleteParameters.getQueryTimeout());
parameters.setShardingStrategy(dataDeleteParameters.getShardingStrategy());

Long jobId = publishJob(parameters, dataDeleteParameters.getTimeoutMillis());
scheduleTaskRepository.updateJobIdById(taskEntity.getId(), jobId);
Expand Down

0 comments on commit aacfb18

Please sign in to comment.