Skip to content

Commit

Permalink
feature: 原子操作能力支持滚动执行 TencentBlueKing#446
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyu096 committed Jan 17, 2022
1 parent 6c66ead commit 40a9cd2
Show file tree
Hide file tree
Showing 17 changed files with 224 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class WebFastExecuteScriptRequest {
@ApiModelProperty(value = "任务实例ID,重做的时候需要传入")
private Long taskInstanceId;

@ApiModelProperty(value = "滚动配置")
@ApiModelProperty(value = "滚动配置, 滚动执行需要传入")
private RollingConfigVO rollingConfig;

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class WebFastPushFileRequest {
message = "{validation.constraints.InvalidJobTimeout_outOfRange.message}")
private Integer timeout;

@ApiModelProperty(value = "滚动配置")
private RollingConfigVO rollingExecutionConfig;
@ApiModelProperty(value = "滚动配置, 滚动执行需要传入")
private RollingConfigVO rollingConfig;

/**
* 传输模式
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,5 +564,21 @@ void testGetPreExecutableStepInstance() {
assertThat(preStepInstance.getId()).isEqualTo(1L);
}

@Test
void updateStepCurrentBatch() {
stepInstanceDAO.updateStepCurrentBatch(1L, 1);

StepInstanceBaseDTO stepInstance = stepInstanceDAO.getStepInstanceBase(1L);
assertThat(stepInstance.getBatch()).isEqualTo(1);
}

@Test
void updateStepRollingConfigId() {
stepInstanceDAO.updateStepRollingConfigId(1L, 1000L);

StepInstanceBaseDTO stepInstance = stepInstanceDAO.getStepInstanceBase(1L);
assertThat(stepInstance.getRollingConfigId()).isEqualTo(1000L);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.tencent.bk.job.execute.model.FastTaskDTO;
import com.tencent.bk.job.execute.model.FileDetailDTO;
import com.tencent.bk.job.execute.model.FileSourceDTO;
import com.tencent.bk.job.execute.model.RollingConfigDTO;
import com.tencent.bk.job.execute.model.ServersDTO;
import com.tencent.bk.job.execute.model.StepInstanceDTO;
import com.tencent.bk.job.execute.model.StepOperationDTO;
Expand All @@ -64,6 +65,7 @@
import com.tencent.bk.job.execute.model.web.vo.ExecuteServersVO;
import com.tencent.bk.job.execute.model.web.vo.ExecuteTargetVO;
import com.tencent.bk.job.execute.model.web.vo.ExecuteVariableVO;
import com.tencent.bk.job.execute.model.web.vo.RollingConfigVO;
import com.tencent.bk.job.execute.model.web.vo.StepExecuteVO;
import com.tencent.bk.job.execute.model.web.vo.StepOperationVO;
import com.tencent.bk.job.execute.model.web.vo.TaskExecuteVO;
Expand Down Expand Up @@ -211,8 +213,20 @@ public Response<StepExecuteVO> fastExecuteScript(String username, Long appId,
StepInstanceDTO stepInstance = buildFastScriptStepInstance(username, appId, request);
String decodeScriptContent = new String(Base64.decodeBase64(request.getContent()), StandardCharsets.UTF_8);
stepInstance.setScriptContent(decodeScriptContent);
RollingConfigDTO rollingConfig = buildRollingConfigIfEnabled(request.getRollingConfig());

return createAndStartFastTask(request.isRedoTask(), taskInstance, stepInstance);
return createAndStartFastTask(request.isRedoTask(), taskInstance, stepInstance, rollingConfig);
}

private RollingConfigDTO buildRollingConfigIfEnabled(RollingConfigVO rollingConfigVO) {
if (rollingConfigVO == null) {
return null;
}
RollingConfigDTO rollingConfigDTO = new RollingConfigDTO();
rollingConfigDTO.setName(rollingConfigVO.getName());
rollingConfigDTO.setMode(rollingConfigVO.getMode());
rollingConfigDTO.setExpr(rollingConfigVO.getBatchExpr());
return rollingConfigDTO;
}

private boolean checkFastExecuteScriptRequest(WebFastExecuteScriptRequest request) {
Expand Down Expand Up @@ -334,17 +348,20 @@ public Response<StepExecuteVO> fastPushFile(String username, Long appId, WebFast

TaskInstanceDTO taskInstance = buildFastFileTaskInstance(username, appId, request);
StepInstanceDTO stepInstance = buildFastFileStepInstance(username, appId, request);
RollingConfigDTO rollingConfig = buildRollingConfigIfEnabled(request.getRollingConfig());

return createAndStartFastTask(false, taskInstance, stepInstance);
return createAndStartFastTask(false, taskInstance, stepInstance, rollingConfig);
}

private Response<StepExecuteVO> createAndStartFastTask(boolean isRedoTask, TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance) {
private Response<StepExecuteVO> createAndStartFastTask(boolean isRedoTask,
TaskInstanceDTO taskInstance,
StepInstanceDTO stepInstance,
RollingConfigDTO rollingConfig) {
long taskInstanceId;
if (!isRedoTask) {
taskInstanceId = taskExecuteService.executeFastTask(
FastTaskDTO.builder().taskInstance(taskInstance).stepInstance(stepInstance).build()
);
FastTaskDTO.builder().taskInstance(taskInstance).stepInstance(stepInstance)
.rollingConfig(rollingConfig).build());
} else {
taskInstanceId = taskExecuteService.redoFastTask(taskInstance, stepInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
import com.tencent.bk.job.common.constant.NotExistPathHandlerEnum;
import com.tencent.bk.job.execute.common.constants.RunStatusEnum;
import com.tencent.bk.job.execute.common.constants.StepExecuteTypeEnum;
import com.tencent.bk.job.execute.model.*;
import com.tencent.bk.job.execute.model.ConfirmStepInstanceDTO;
import com.tencent.bk.job.execute.model.FileSourceDTO;
import com.tencent.bk.job.execute.model.FileStepInstanceDTO;
import com.tencent.bk.job.execute.model.ScriptStepInstanceDTO;
import com.tencent.bk.job.execute.model.StepInstanceBaseDTO;
import com.tencent.bk.job.execute.model.StepInstanceDTO;
import com.tencent.bk.job.manage.common.consts.script.ScriptTypeEnum;

import java.util.List;
Expand Down Expand Up @@ -188,4 +193,20 @@ List<List<FileSourceDTO>> listFastPushFileSource(Long appId, DuplicateHandlerEnu
* @return
*/
Byte getScriptTypeByStepInstanceId(long stepInstanceId);

/**
* 更新步骤实例的当前滚动执行批次
*
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次
*/
void updateStepCurrentBatch(long stepInstanceId, int batch);

/**
* 更新步骤实例的滚动配置ID
*
* @param stepInstanceId 步骤实例ID
* @param rollingConfigId 滚动配置ID
*/
void updateStepRollingConfigId(long stepInstanceId, long rollingConfigId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public class StepInstanceDAOImpl implements StepInstanceDAO {
T_STEP_INSTANCE.IGNORE_ERROR,
T_STEP_INSTANCE.STEP_NUM,
T_STEP_INSTANCE.STEP_ORDER,
T_STEP_INSTANCE.BATCH
T_STEP_INSTANCE.BATCH,
T_STEP_INSTANCE.ROLLING_CONFIG_ID
};

private final DSLContext CTX;
Expand Down Expand Up @@ -378,6 +379,7 @@ private StepInstanceBaseDTO extractBaseInfo(Record record) {
stepInstance.setStepNum(record.get(t.STEP_NUM));
stepInstance.setStepOrder(record.get(t.STEP_ORDER));
stepInstance.setBatch(record.get(t.BATCH));
stepInstance.setRollingConfigId(record.get(t.ROLLING_CONFIG_ID));
return stepInstance;
}

Expand Down Expand Up @@ -887,4 +889,18 @@ private List<FileSourceDTO> convertStringToFileSourceDTO(String str) {
return JsonUtils.fromJson(str, new TypeReference<ArrayList<FileSourceDTO>>() {
});
}

@Override
public void updateStepCurrentBatch(long stepInstanceId, int batch) {
CTX.update(T_STEP_INSTANCE).set(T_STEP_INSTANCE.BATCH, JooqDataTypeUtil.toShort(batch))
.where(T_STEP_INSTANCE.ID.eq(stepInstanceId))
.execute();
}

@Override
public void updateStepRollingConfigId(long stepInstanceId, long rollingConfigId) {
CTX.update(T_STEP_INSTANCE).set(T_STEP_INSTANCE.ROLLING_CONFIG_ID, rollingConfigId)
.where(T_STEP_INSTANCE.ID.eq(stepInstanceId))
.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.List;

/**
* 服务器滚动分批上下文
* 服务器滚动分批
*/
public class RollingBatchServersResolveContext {
public class RollingBatchServersResolver {
/**
* 需要分批的服务器
*/
Expand All @@ -56,28 +56,28 @@ public class RollingBatchServersResolveContext {
/**
* 分批结果
*/
private final List<RollingBatchServers> serverBatches;
private final List<RollingServerBatch> serverBatches;

public RollingBatchServersResolveContext(List<IpDTO> servers, String rollingExpr) {
public RollingBatchServersResolver(List<IpDTO> servers, String rollingExpr) {
this.servers = servers;
this.remainedServers = new ArrayList<>(this.servers);
this.rollingExpr = rollingExpr;
this.total = servers.size();
this.serverBatches = new ArrayList<>();
}

public List<RollingBatchServers> resolve() {
public List<RollingServerBatch> resolve() {
RollingExpr rollingExpr = new RollingExpr(this.rollingExpr);
while (hasRemainedServer()) {
this.batchCount++;
RollingExprPart rollingExprPart = rollingExpr.nextRollingExprPart(this.batchCount);
List<IpDTO> serversOnBatch = rollingExprPart.compute(this.total, this.remainedServers);
this.remainedServers.removeAll(serversOnBatch);
RollingBatchServers rollingBatchServers = new RollingBatchServers();
rollingBatchServers.setBatch(this.batchCount);
rollingBatchServers.setServers(serversOnBatch);
rollingBatchServers.setRollingExprPart(rollingExprPart);
this.serverBatches.add(rollingBatchServers);
RollingServerBatch rollingServerBatch = new RollingServerBatch();
rollingServerBatch.setBatch(this.batchCount);
rollingServerBatch.setServers(serversOnBatch);
rollingServerBatch.setRollingExprPart(rollingExprPart);
this.serverBatches.add(rollingServerBatch);
}
return this.serverBatches;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@

import java.util.List;

/**
* 滚动执行服务器分批
*/
@Data
class RollingBatchServers {
public class RollingServerBatch {
private RollingExprPart rollingExprPart;
private int batch;
private List<IpDTO> servers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ public class FastTaskDTO {
*/
private StepInstanceDTO stepInstance;
/**
* 是否滚动执行
*/
private boolean rollingEnabled;
/**
* 滚动策略
* 滚动配置
*/
private Integer rollingMode;
private RollingConfigDTO rollingConfig;

/**
* 滚动表达式
* 是否滚动执行
*
* @return 是否滚动执行
*/
private String rollingExpr;
public boolean isRollingEnabled() {
return this.rollingConfig != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.execute.model;

import lombok.Data;

/**
* 滚动配置
*/
@Data
public class RollingConfigDTO {
private String name;
/**
* 滚动策略
*/
private Integer mode;
/**
* 滚动表达式
*/
private String expr;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.List;

/**
* 滚动执行-服务器分批
* 滚动执行-服务器分批 DO
*/
@Data
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ public interface RollingConfigService {
*/
List<IpDTO> getRollingServers(StepInstanceBaseDTO stepInstance);

/**
* 获取步骤当前批次对应的主机
*
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次
* @return 主机列表
*/
List<IpDTO> getRollingServers(long stepInstanceId, int batch);

/**
* 保存快速执行作业滚动配置
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@

package com.tencent.bk.job.execute.service;

import com.tencent.bk.job.common.model.dto.IpDTO;

import java.util.List;

/**
* 作业步骤执行实例 Service
*/
Expand All @@ -48,12 +44,5 @@ public interface StepInstanceService {
*/
void updateStepRollingConfigId(long stepInstanceId, long rollingConfigId);

/**
* 获取步骤当前批次对应的主机
*
* @param stepInstanceId 步骤实例ID
* @param batch 滚动执行批次
* @return 主机列表
*/
List<IpDTO> getRollingServers(long stepInstanceId, int batch);

}
Loading

0 comments on commit 40a9cd2

Please sign in to comment.