Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-3575][*] Force Task Success #4267

Merged
merged 66 commits into from
Jan 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
266f002
Merge pull request #1 from apache/dev
legen618 Jul 22, 2020
7f8d060
Merge pull request #2 from apache/dev
legen618 Jul 22, 2020
d86ceee
[implement] change task state
Jul 23, 2020
3fceee9
从强制成功的节点处启动-apiserver部分
Jul 23, 2020
5e9d664
从强制成功处启动-masterServer部分
Jul 25, 2020
e912a80
fix bug in mapper
Jul 26, 2020
fa0bb5e
[bugfix] issue #3324
Jul 28, 2020
d57daf0
Merge pull request #3 from apache/dev
legen618 Aug 4, 2020
23277bf
Revert "[bugfix] issue #3324"
Aug 4, 2020
7cf55d5
Merge branch 'dev' of https://github.com/legen618/incubator-dolphinsc…
Aug 4, 2020
9365376
支持sub-process和condition的TFS
Aug 4, 2020
5d02e30
调整数据库访问函数
Aug 5, 2020
18ae56b
改变原有的dag构建参数
Aug 5, 2020
d0e6b09
加入valid检验
Aug 5, 2020
48afb2e
增加强制成功对运行中process的影响
Aug 6, 2020
90a0601
修改与新增commandType相关的代码
Aug 7, 2020
c8fb897
添加dataAnalysis的状态统计相关的部分
Aug 7, 2020
079768f
revert
Aug 7, 2020
815f62a
增加了修改task状态的单元测试
Aug 7, 2020
871be17
Merge pull request #4 from apache/dev
legen618 Aug 17, 2020
b418b68
add taskInstanceMapperTest
Aug 22, 2020
1f19cf2
update test of TaskInstanceController
Aug 22, 2020
373cc47
Merge pull request #5 from apache/dev
legen618 Aug 22, 2020
6bdd11d
Merge branch 'dev' into dev-FTS
Aug 22, 2020
295796a
fix
Aug 22, 2020
ae9eb16
解决更新后的逻辑冲突
Aug 22, 2020
1fc1218
fix import problems
Aug 23, 2020
6407745
Update DataAnalysisServiceImpl.java
Aug 23, 2020
31cd1f6
adapt to code style
Aug 23, 2020
d97cc3d
fix vulnerability
Aug 24, 2020
6e5145f
fix some code smell problems
Aug 24, 2020
de19fab
change back to original import order
Aug 26, 2020
b736b09
Merge pull request #6 from apache/dev
legen618 Aug 26, 2020
7e4bf19
Merge branch 'dev' into dev-FTS
Aug 26, 2020
79bff32
Update TaskInstanceService.java
Aug 26, 2020
757f8e4
fix spelling mistake
Aug 26, 2020
211bcd5
change bad varible name
Aug 28, 2020
92e70b9
improvement
Aug 28, 2020
83548e6
add test of executorService
Aug 29, 2020
d2a43b0
fix bug about CONDITION-task
Aug 29, 2020
3eeea22
improve test of DAGhelper
Aug 29, 2020
58b4e1f
fix vulnerability
Aug 29, 2020
bbfd35b
improve test coverage
Aug 29, 2020
d0eacfe
Merge pull request #7 from apache/dev
legen618 Sep 8, 2020
cf9576b
Merge branch 'dev' into dev-FTS
Sep 8, 2020
8ba41dc
fix code smells
Sep 8, 2020
8e4037c
Update MasterExecThread.java
Sep 8, 2020
a224d6f
Update DagHelper.java
Sep 8, 2020
37762c2
Merge branch 'dev' into dev-FTS
legen618 Oct 13, 2020
4f0c74d
Update Status.java
Oct 14, 2020
897ce2d
Merge branch 'dev' into dev-FTS
legen618 Oct 25, 2020
7817601
Update Status.java
Oct 26, 2020
fcc66d8
Merge pull request #2 from apache/dev
yinancx Nov 27, 2020
972c6d8
start from force success
Dec 3, 2020
cb647e9
correct param name
Dec 3, 2020
cc2068b
correct names
Dec 3, 2020
da74000
adapt the latest dev branch
chengshiwen Dec 19, 2020
faddff5
add pasue icon button and adjust operation button style
chengshiwen Dec 19, 2020
c3b5c5f
merge force task success from legen618/dev-FTS
chengshiwen Dec 20, 2020
925d88d
Align each item one by one in locale
chengshiwen Dec 20, 2020
af720d3
match frontend and backend for force success
chengshiwen Dec 20, 2020
ae0ee3f
fix code style and test
chengshiwen Dec 20, 2020
aa97b8a
adjust locale for new pr
chengshiwen Dec 23, 2020
325bc2e
Merge branch 'dev' into dev
davidzollo Jan 5, 2021
5f39792
remove the operation resume_from_forced_success
chengshiwen Jan 7, 2021
e3547a3
refactor code styles
chengshiwen Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.api.controller;

import static org.apache.dolphinscheduler.api.enums.Status.FORCE_TASK_SUCCESS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_PAGING_ERROR;

import org.apache.dolphinscheduler.api.exceptions.ApiException;
Expand All @@ -36,6 +37,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand Down Expand Up @@ -127,4 +129,27 @@ public Result queryTaskListPaging(@ApiIgnore @RequestAttribute(value = Constants
return returnDataListPaging(result);
}

/**
* change one task instance's state from FAILURE to FORCED_SUCCESS
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
@ApiOperation(value = "force-success", notes = "FORCE_TASK_SUCCESS")
@ApiImplicitParams({
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_INSTANCE_ID", required = true, dataType = "Int", example = "12")
})
@PostMapping(value = "/force-success")
@ResponseStatus(HttpStatus.OK)
@ApiException(FORCE_TASK_SUCCESS_ERROR)
public Result<Object> forceTaskSuccess(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName,
@RequestParam(value = "taskInstanceId") Integer taskInstanceId) {
logger.info("force task success, project: {}, task instance id: {}", projectName, taskInstanceId);
Map<String, Object> result = taskInstanceService.forceTaskSuccess(loginUser, projectName, taskInstanceId);
return returnDataList(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ private void countTaskDtos(List<ExecuteStatusCount> taskInstanceStateCounts) {
.sum();
}

// remove the specified state
public void removeStateFromCountList(ExecutionStatus status) {
for (TaskStateCount count : this.taskCountDtos) {
if (count.getTaskStateType().equals(status)) {
this.taskCountDtos.remove(count);
break;
}
}
}

public List<TaskStateCount> getTaskCountDtos() {
return taskCountDtos;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public enum ExecuteType {
* 4 stop
* 5 pause
*/
NONE,REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;
NONE, REPEAT_RUNNING, RECOVER_SUSPENDED_PROCESS, START_FAILURE_TASK_PROCESS, STOP, PAUSE;


public static ExecuteType getEnum(int value){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ public enum Status {
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),


UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
Expand Down Expand Up @@ -247,7 +250,7 @@ public enum Status {
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),

HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,50 @@ public Map<String, Object> queryTaskListPaging(User loginUser, String projectNam
return result;
}

/**
* change one task instance's state from failure to forced success
*
* @param loginUser login user
* @param projectName project name
* @param taskInstanceId task instance id
* @return the result code and msg
*/
public Map<String, Object> forceTaskSuccess(User loginUser, String projectName, Integer taskInstanceId) {
Map<String, Object> result = new HashMap<>(5);
Project project = projectMapper.queryByName(projectName);

// check user auth
Map<String, Object> checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName);
Status status = (Status) checkResult.get(Constants.STATUS);
if (status != Status.SUCCESS) {
return checkResult;
}

// check whether the task instance can be found
TaskInstance task = taskInstanceMapper.selectById(taskInstanceId);
if (task == null) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}

// check whether the task instance state type is failure
if (!task.getState().typeIsFailure()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskInstanceId, task.getState().toString());
return result;
}

// change the state of the task instance
task.setState(ExecutionStatus.FORCED_SUCCESS);
int changedNum = taskInstanceMapper.updateById(task);
if (changedNum > 0) {
putMsg(result, Status.SUCCESS);
} else {
putMsg(result, Status.FORCE_TASK_SUCCESS_ERROR);
}

return result;
}

/***
* generate {@link org.apache.dolphinscheduler.api.enums.Status#REQUEST_PARAMS_NOT_VALID_ERROR} res with param name
* @param result exist result map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
Expand Down Expand Up @@ -114,12 +115,17 @@ public Map<String, Object> countTaskStateByProject(User loginUser, int projectId
* @return process instance state count data
*/
public Map<String, Object> countProcessInstanceStateByProject(User loginUser, int projectId, String startDate, String endDate) {
return this.countStateByProject(
Map<String, Object> result = this.countStateByProject(
loginUser,
projectId,
startDate,
endDate,
(start, end, projectIds) -> this.processInstanceMapper.countInstanceStateByUser(start, end, projectIds));
// process state count needs to remove state of forced success
if (result.containsKey(Constants.STATUS) && result.get(Constants.STATUS).equals(Status.SUCCESS)) {
((TaskCountDto)result.get(Constants.DATA_LIST)).removeStateFromCountList(ExecutionStatus.FORCED_SUCCESS);
}
return result;
}

private Map<String, Object> countStateByProject(User loginUser, int projectId, String startDate, String endDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,42 @@
package org.apache.dolphinscheduler.api.controller;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.TaskInstanceService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.HashMap;
import java.util.Map;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

/**
* task instance controller test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class TaskInstanceControllerTest {
public class TaskInstanceControllerTest extends AbstractControllerTest {

@InjectMocks
private TaskInstanceController taskInstanceController;
Expand All @@ -67,7 +76,27 @@ public void testQueryTaskListPaging() {
Result taskResult = taskInstanceController.queryTaskListPaging(null, "", 1, "", "",
"", "", ExecutionStatus.SUCCESS,"192.168.xx.xx", "2020-01-01 00:00:00", "2020-01-02 00:00:00",pageNo, pageSize);
Assert.assertEquals(Integer.valueOf(Status.SUCCESS.getCode()), taskResult.getCode());

}

@Ignore
@Test
public void testForceTaskSuccess() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("taskInstanceId", "104");

Map<String, Object> mockResult = new HashMap<>(5);
mockResult.put(Constants.STATUS, Status.SUCCESS);
mockResult.put(Constants.MSG, Status.SUCCESS.getMsg());
when(taskInstanceService.forceTaskSuccess(any(User.class), anyString(), anyInt())).thenReturn(mockResult);

MvcResult mvcResult = mockMvc.perform(post("/projects/{projectName}/task-instance/force-success", "cxc_1113")
.header(SESSION_ID, sessionId)
.params(paramsMap))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
.andReturn();

Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.RunMode;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.Tenant;
Expand Down Expand Up @@ -82,12 +84,16 @@ public class ExecutorService2Test {

private int processDefinitionId = 1;

private int processInstanceId = 1;

private int tenantId = 1;

private int userId = 1;

private ProcessDefinition processDefinition = new ProcessDefinition();

private ProcessInstance processInstance = new ProcessInstance();

private User loginUser = new User();

private String projectName = "projectName";
Expand All @@ -107,6 +113,13 @@ public void init() {
processDefinition.setTenantId(tenantId);
processDefinition.setUserId(userId);

// processInstance
processInstance.setId(processInstanceId);
processInstance.setProcessDefinitionId(processDefinitionId);
processInstance.setState(ExecutionStatus.FAILURE);
processInstance.setExecutorId(userId);
processInstance.setTenantId(tenantId);

// project
project.setName(projectName);

Expand All @@ -120,6 +133,8 @@ public void init() {
Mockito.when(processService.getTenantForProcess(tenantId, userId)).thenReturn(new Tenant());
Mockito.when(processService.createCommand(any(Command.class))).thenReturn(1);
Mockito.when(monitorService.getServerListFromZK(true)).thenReturn(getMasterServersList());
Mockito.when(processService.findProcessInstanceDetailById(processInstanceId)).thenReturn(processInstance);
Mockito.when(processService.findProcessDefineById(processDefinitionId)).thenReturn(processDefinition);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,48 @@ private void putMsg(Map<String, Object> result, Status status, Object... statusP
result.put(Constants.MSG, status.getMsg());
}
}
}

@Test
public void forceTaskSuccess() {
User user = getAdminUser();
String projectName = "test";
Project project = getProject(projectName);
int taskId = 1;
TaskInstance task = getTaskInstance();

Map<String, Object> mockSuccess = new HashMap<>(5);
putMsg(mockSuccess, Status.SUCCESS);
when(projectMapper.queryByName(projectName)).thenReturn(project);

// user auth failed
Map<String, Object> mockFailure = new HashMap<>(5);
putMsg(mockFailure, Status.USER_NO_OPERATION_PROJECT_PERM, user.getUserName(), projectName);
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockFailure);
Map<String, Object> authFailRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertNotSame(Status.SUCCESS, authFailRes.get(Constants.STATUS));

// test task not found
when(projectService.checkProjectAndAuth(user, project, projectName)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
Map<String, Object> taskNotFoundRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND, taskNotFoundRes.get(Constants.STATUS));

// test task instance state error
task.setState(ExecutionStatus.SUCCESS);
when(taskInstanceMapper.selectById(1)).thenReturn(task);
Map<String, Object> taskStateErrorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_STATE_OPERATION_ERROR, taskStateErrorRes.get(Constants.STATUS));

// test error
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(0);
Map<String, Object> errorRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.FORCE_TASK_SUCCESS_ERROR, errorRes.get(Constants.STATUS));

// test success
task.setState(ExecutionStatus.FAILURE);
when(taskInstanceMapper.updateById(task)).thenReturn(1);
Map<String, Object> successRes = taskInstanceService.forceTaskSuccess(user, projectName, taskId);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum ExecutionStatus {
* 10 waiting thread
* 11 waiting depend node complete
* 12 delay execution
* 13 forced success
*/
SUBMITTED_SUCCESS(0, "submit success"),
RUNNING_EXECUTION(1, "running"),
Expand All @@ -54,7 +55,8 @@ public enum ExecutionStatus {
KILL(9, "kill"),
WAITTING_THREAD(10, "waiting thread"),
WAITTING_DEPEND(11, "waiting depend node complete"),
DELAY_EXECUTION(12, "delay execution");
DELAY_EXECUTION(12, "delay execution"),
FORCED_SUCCESS(13, "forced success");

ExecutionStatus(int code, String descp) {
this.code = code;
Expand All @@ -79,7 +81,7 @@ public enum ExecutionStatus {
* @return status
*/
public boolean typeIsSuccess() {
return this == SUCCESS;
return this == SUCCESS || this == FORCED_SUCCESS;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, Object>
* @throws ParseException ParseException
*/
public static void convertVarPoolToMap(Map<String, Object> propToValue, String varPool) throws ParseException {
if (varPool == null || propToValue == null) {
if (propToValue == null || StringUtils.isEmpty(varPool)) {
return;
}
String[] splits = varPool.split("\\$VarPool\\$");
Expand Down
Loading