Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX-3929] condition task would post wrong tasks when failover. #3999

Merged
merged 12 commits into from
Oct 27, 2020
2 changes: 1 addition & 1 deletion dolphinscheduler-api/src/main/resources/logback-api.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@


<root level="INFO">
<appender-ref ref="STDOUT"/>
<!--<appender-ref ref="STDOUT"/>-->
<appender-ref ref="APILOGFILE"/>
</root>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
import org.apache.dolphinscheduler.common.process.ProcessDag;
import org.apache.dolphinscheduler.common.task.conditions.ConditionsParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
Expand Down Expand Up @@ -238,78 +239,148 @@ public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String nodeNam
return null;
}


/**
* get start vertex in one dag
* it would find the post node if the start vertex is forbidden running
* @param parentNodeName previous node
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return start Vertex list
* @return can submit
*/
public static Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){

if(completeTaskList == null){
completeTaskList = new HashMap<>();
public static boolean allDependsForbiddenOrEnd(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
List<String> dependList = taskNode.getDepList();
if (dependList == null) {
return true;
}
Collection<String> startVertexs = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertexs = dag.getSubsequentNodes(parentNodeName);
}else{
startVertexs = dag.getBeginNode();
for (String dependNodeName : dependList) {
TaskNode dependNode = dag.getNode(dependNodeName);
if (completeTaskList.containsKey(dependNodeName) || dependNode.isForbidden()) {
continue;
} else {
return false;
}
}
return true;
}

List<String> tmpStartVertexs = new ArrayList<>();
if(startVertexs!= null){
tmpStartVertexs.addAll(startVertexs);
/**
*
* @param preNodeName
* @return
*/
public static Set<String> parsePostNodes(String preNodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {
Set<String> postNodeList = new HashSet<>();
Collection<String> startVertexes = new ArrayList<>();
if (preNodeName == null) {
startVertexes = dag.getBeginNode();
} else if (dag.getNode(preNodeName).isConditionsTask()) {
List<String> conditionTaskList = parseConditionTask(preNodeName, skipTaskNodeList, dag, completeTaskList);
startVertexes.addAll(conditionTaskList);
} else {
startVertexes = dag.getSubsequentNodes(preNodeName);
}

for(String start : startVertexs){
TaskNode startNode = dag.getNode(start);
if(!startNode.isForbidden() && !completeTaskList.containsKey(start)){
// the start can be submit if not forbidden and not in complete tasks
for (String subsequent : startVertexes) {
TaskNode taskNode = dag.getNode(subsequent);
if (isTaskNodeNeedSkip(taskNode, skipTaskNodeList)) {
setTaskNodeSkip(subsequent, dag, completeTaskList, skipTaskNodeList );
continue;
}
// then submit the post nodes
Collection<String> postNodes = getStartVertex(start, dag, completeTaskList);
for(String post : postNodes){
TaskNode postNode = dag.getNode(post);
if(taskNodeCanSubmit(postNode, dag, completeTaskList)){
tmpStartVertexs.add(post);
}
if (!DagHelper.allDependsForbiddenOrEnd(taskNode, dag, completeTaskList)) {
continue;
}
if (taskNode.isForbidden() || completeTaskList.containsKey(subsequent)) {
postNodeList.addAll(parsePostNodes(subsequent, skipTaskNodeList, dag, completeTaskList));
continue;
}
tmpStartVertexs.remove(start);
postNodeList.add(subsequent);
}
return tmpStartVertexs;
return postNodeList;
}

/**
* the task can be submit when all the depends nodes are forbidden or complete
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
* if all of the task dependence are skipped, skip it too.
* @param taskNode
* @return
*/
public static boolean taskNodeCanSubmit(TaskNode taskNode,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList) {

List<String> dependList = taskNode.getDepList();
if(dependList == null){
return true;
private static boolean isTaskNodeNeedSkip(TaskNode taskNode,
Map<String, TaskNode> skipTaskNodeList
){
if(CollectionUtils.isEmpty(taskNode.getDepList())){
return false;
}

for(String dependNodeName : dependList){
TaskNode dependNode = dag.getNode(dependNodeName);
if(!dependNode.isForbidden() && !completeTaskList.containsKey(dependNodeName)){
for(String depNode : taskNode.getDepList()){
if(!skipTaskNodeList.containsKey(depNode)){
return false;
}
}
return true;
}


/**
* parse condition task find the branch process
* set skip flag for another one.
* @param nodeName
* @return
*/
public static List<String> parseConditionTask(String nodeName,
Map<String, TaskNode> skipTaskNodeList,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList){
List<String> conditionTaskList = new ArrayList<>();
TaskNode taskNode = dag.getNode(nodeName);
if (!taskNode.isConditionsTask()){
return conditionTaskList;
}
if (!completeTaskList.containsKey(nodeName)){
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeName);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
List<String> skipNodeList = new ArrayList<>();
if(taskInstance.getState().typeIsSuccess()){
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
}else if(taskInstance.getState().typeIsFailure()){
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
}else{
conditionTaskList.add(nodeName);
}
for(String failedNode : skipNodeList){
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
}
return conditionTaskList;
}

/**
* set task node and the post nodes skip flag
* @param skipNodeName
* @param dag
* @param completeTaskList
* @param skipTaskNodeList
*/
private static void setTaskNodeSkip(String skipNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag,
Map<String, TaskInstance> completeTaskList,
Map<String, TaskNode> skipTaskNodeList){
skipTaskNodeList.putIfAbsent(skipNodeName, dag.getNode(skipNodeName));
Collection<String> postNodeList = dag.getSubsequentNodes(skipNodeName);
for(String post : postNodeList){
TaskNode postNode = dag.getNode(post);
if(isTaskNodeNeedSkip(postNode, skipTaskNodeList)){
setTaskNodeSkip(post, dag, completeTaskList, skipTaskNodeList);
}
}
}



/***
* build dag graph
* @param processDag processDag
Expand Down
Loading