Skip to content

Commit

Permalink
[FIX-3617][Service]after subtask fault tolerance, 2 task instances ar…
Browse files Browse the repository at this point in the history
…e generated (#3830)

* fix bug(#3617): after subtask fault tolerance, 2 task instances are generated.

* delete unused code

* update code smell

* refactor sub work command process

* add process service ut

* add license header

* fix some code smell

* chang ut java8 to java11

* update sonar to java11

* copy ut config from dev

* remove checkstyle

* revert to 1.3.3

* change proess service test to executor service

* add process service test

* add process service test

* revert

* revert

* add comments

* change dev to 1.3.3-release

* revert

Co-authored-by: baoliang <baoliang@analysys.com.cn>
  • Loading branch information
lenboo and baoliang authored Oct 13, 2020
1 parent 1894651 commit f29f4e8
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,4 @@ jobs:
run: |
mkdir -p ${LOG_DIR}
docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml logs dolphinscheduler-postgresql > ${LOG_DIR}/db.txt
continue-on-error: true
continue-on-error: true
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failov
* @throws Exception exception
*/
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if(StringUtils.isEmpty(serverHost) || serverHost.startsWith(OSUtils.getHost())){
return ;
if (StringUtils.isEmpty(serverHost)) {
return;
}
switch (zkNodeType){
switch (zkNodeType) {
case MASTER:
failoverMaster(serverHost);
break;
Expand Down Expand Up @@ -262,7 +262,7 @@ private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
Date workerServerStartDate = null;
List<Server> workerServers = getServersList(ZKNodeType.WORKER);
for(Server workerServer : workerServers){
if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){
if(taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())){
workerServerStartDate = workerServer.getCreateTime();
break;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ private void failoverMaster(String masterHost) {
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
if(Constants.NULL.equals(processInstance.getHost()) ){
continue;
continue;
}
processService.processNeedFailoverProcessInstances(processInstance);
}
Expand All @@ -350,4 +350,4 @@ public InterProcessMutex blockAcquireMutex() throws Exception {
return mutex;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.File;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -840,7 +839,7 @@ public TaskInstance submitTask(TaskInstance taskInstance){
return task;
}
if(!task.getState().typeIsFinished()){
createSubWorkProcessCommand(processInstance, task);
createSubWorkProcess(processInstance, task);
}

logger.info("end submit task to db successfully:{} state:{} complete, instance id:{} state: {} ",
Expand All @@ -850,20 +849,22 @@ public TaskInstance submitTask(TaskInstance taskInstance){

/**
* set work process instance map
* consider o
* repeat running does not generate new sub process instance
* set map {parent instance id, task instance id, 0(child instance id)}
* @param parentInstance parentInstance
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask){
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
ProcessInstanceMap processMap = findWorkProcessMapByParent(parentInstance.getId(), parentTask.getId());
if(processMap != null){
if (processMap != null) {
return processMap;
}else if(parentInstance.getCommandType() == CommandType.REPEAT_RUNNING
|| parentInstance.isComplementData()){
}
if (parentInstance.getCommandType() == CommandType.REPEAT_RUNNING) {
// update current task id to map
// repeat running does not generate new sub process instance
processMap = findPreviousTaskProcessMap(parentInstance, parentTask);
if(processMap!= null){
if (processMap != null) {
processMap.setParentTaskInstanceId(parentTask.getId());
updateWorkProcessInstanceMap(processMap);
return processMap;
Expand Down Expand Up @@ -892,7 +893,7 @@ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProc
if(task.getName().equals(parentTask.getName())){
preTaskId = task.getId();
ProcessInstanceMap map = findWorkProcessMapByParent(parentProcessInstance.getId(), preTaskId);
if(map!=null){
if(map != null){
return map;
}
}
Expand All @@ -907,63 +908,110 @@ private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProc
* @param parentProcessInstance parentProcessInstance
* @param task task
*/
private void createSubWorkProcessCommand(ProcessInstance parentProcessInstance,
TaskInstance task){
if(!task.isSubProcess()){
public void createSubWorkProcess(ProcessInstance parentProcessInstance,
TaskInstance task) {
if (!task.isSubProcess()) {
return;
}
ProcessInstanceMap instanceMap = setProcessInstanceMap(parentProcessInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
//check create sub work flow firstly
ProcessInstanceMap instanceMap = findWorkProcessMapByParent(parentProcessInstance.getId(), task.getId());
if (null != instanceMap && CommandType.RECOVER_TOLERANCE_FAULT_PROCESS == parentProcessInstance.getCommandType()) {
// recover failover tolerance would not create a new command when the sub command already have been created
return;
}
instanceMap = setProcessInstanceMap(parentProcessInstance, task);
ProcessInstance childInstance = null;
if (instanceMap.getProcessInstanceId() != 0) {
childInstance = findProcessInstanceById(instanceMap.getProcessInstanceId());
}
Command subProcessCommand = createSubProcessCommand(parentProcessInstance, childInstance, instanceMap, task);
updateSubProcessDefinitionByParent(parentProcessInstance, subProcessCommand.getProcessDefinitionId());
initSubInstanceState(childInstance);
createCommand(subProcessCommand);
logger.info("sub process command created: {} ", subProcessCommand);
}

ProcessInstance childInstance = findSubProcessInstance(parentProcessInstance.getId(), task.getId());

CommandType fatherType = parentProcessInstance.getCommandType();
CommandType commandType = fatherType;
if(childInstance == null){
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
// sub process must begin with schedule/complement data
// if father begin with scheduler/complement data
if(fatherHistoryCommand.startsWith(CommandType.SCHEDULER.toString()) ||
fatherHistoryCommand.startsWith(CommandType.COMPLEMENT_DATA.toString())){
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
}

if(childInstance != null){
childInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
updateProcessInstance(childInstance);
}
/**
* complement data needs transform parent parameter to child.
* @param instanceMap
* @param parentProcessInstance
* @return
*/
private String getSubWorkFlowParam(ProcessInstanceMap instanceMap, ProcessInstance parentProcessInstance) {
// set sub work process command
String processMapStr = JSONUtils.toJson(instanceMap);
Map<String, String> cmdParam = JSONUtils.toMap(processMapStr);

if(commandType == CommandType.COMPLEMENT_DATA ||
(childInstance != null && childInstance.isComplementData())){
if (parentProcessInstance.isComplementData()) {
Map<String, String> parentParam = JSONUtils.toMap(parentProcessInstance.getCommandParam());
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
String endTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_END_DATE);
String startTime = parentParam.get(CMDPARAM_COMPLEMENT_DATA_START_DATE);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_END_DATE, endTime);
cmdParam.put(CMDPARAM_COMPLEMENT_DATA_START_DATE, startTime);
processMapStr = JSONUtils.toJson(cmdParam);
}
return processMapStr;
}

updateSubProcessDefinitionByParent(parentProcessInstance, childDefineId);
/**
* create sub work process command
* @param parentProcessInstance
* @param childInstance
* @param instanceMap
* @param task
*/
public Command createSubProcessCommand(ProcessInstance parentProcessInstance,
ProcessInstance childInstance,
ProcessInstanceMap instanceMap,
TaskInstance task) {
CommandType commandType = getSubCommandType(parentProcessInstance, childInstance);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map<String, String> subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer childDefineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
String processParam = getSubWorkFlowParam(instanceMap, parentProcessInstance);

return new Command(
commandType,
TaskDependType.TASK_POST,
parentProcessInstance.getFailureStrategy(),
parentProcessInstance.getExecutorId(),
childDefineId,
processParam,
parentProcessInstance.getWarningType(),
parentProcessInstance.getWarningGroupId(),
parentProcessInstance.getScheduleTime(),
parentProcessInstance.getProcessInstancePriority()
);
}

/**
* initialize sub work flow state
* child instance state would be initialized when 'recovery from pause/stop/failure'
* @param childInstance
*/
private void initSubInstanceState(ProcessInstance childInstance) {
if (childInstance != null) {
childInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
updateProcessInstance(childInstance);
}
}

Command command = new Command();
command.setWarningType(parentProcessInstance.getWarningType());
command.setWarningGroupId(parentProcessInstance.getWarningGroupId());
command.setFailureStrategy(parentProcessInstance.getFailureStrategy());
command.setProcessDefinitionId(childDefineId);
command.setScheduleTime(parentProcessInstance.getScheduleTime());
command.setExecutorId(parentProcessInstance.getExecutorId());
command.setCommandParam(processMapStr);
command.setCommandType(commandType);
command.setProcessInstancePriority(parentProcessInstance.getProcessInstancePriority());
command.setWorkerGroup(parentProcessInstance.getWorkerGroup());
createCommand(command);
logger.info("sub process command created: {} ", command.toString());
/**
* get sub work flow command type
* child instance exist: child command = fatherCommand
* child instance not exists: child command = fatherCommand[0]
*
* @param parentProcessInstance
* @return
*/
private CommandType getSubCommandType(ProcessInstance parentProcessInstance, ProcessInstance childInstance) {
CommandType commandType = parentProcessInstance.getCommandType();
if (childInstance == null) {
String fatherHistoryCommand = parentProcessInstance.getHistoryCmd();
commandType = CommandType.valueOf(fatherHistoryCommand.split(Constants.COMMA)[0]);
}
return commandType;
}

/**
Expand Down Expand Up @@ -1065,9 +1113,9 @@ public String taskZkInfo(TaskInstance taskInstance) {
public ExecutionStatus getSubmitTaskState(TaskInstance taskInstance, ExecutionStatus processInstanceState){
ExecutionStatus state = taskInstance.getState();
if(
// running or killed
// the task already exists in task queue
// return state
// running or killed
// the task already exists in task queue
// return state
state == ExecutionStatus.RUNNING_EXEUTION
|| state == ExecutionStatus.KILL
|| checkTaskExistsInTaskQueue(taskInstance)
Expand Down Expand Up @@ -1252,7 +1300,7 @@ public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus s
* @return task instance list
*/
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId){
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
}

/**
Expand Down Expand Up @@ -1431,20 +1479,6 @@ public List<String> convertIntListToString(List<Integer> intList){
return result;
}

/**
* update pid and app links field by task instance id
* @param taskInstId taskInstId
* @param pid pid
* @param appLinks appLinks
*/
public void updatePidByTaskInstId(int taskInstId, int pid,String appLinks) {

TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(pid);
taskInstance.setAppLink(appLinks);
saveTaskInstance(taskInstance);
}

/**
* query schedule by id
* @param id id
Expand Down
Loading

0 comments on commit f29f4e8

Please sign in to comment.