Skip to content

Commit

Permalink
Fix network error cause worker cannot send message to master
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jul 11, 2022
1 parent aa8b88a commit 40588a9
Show file tree
Hide file tree
Showing 58 changed files with 1,564 additions and 1,395 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.server.master.config;

import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
Expand Down Expand Up @@ -82,6 +83,10 @@ public class MasterConfig implements Validator {
private double reservedMemory = 0.3;
private Duration failoverInterval = Duration.ofMinutes(10);
private boolean killYarnJobWhenTaskFailover = true;
/**
* ip:listenPort
*/
private String masterAddress;

@Override
public boolean supports(Class<?> clazz) {
Expand Down Expand Up @@ -124,5 +129,6 @@ public void validate(Object target, Errors errors) {
if (masterConfig.getMaxCpuLoadAvg() <= 0) {
masterConfig.setMaxCpuLoadAvg(Runtime.getRuntime().availableProcessors() * 2);
}
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
Expand Down Expand Up @@ -241,7 +241,11 @@ private void addDispatchEvent(TaskExecutionContext context, ExecutionContext exe
}

private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
// todo: we didn't set the host here, since right now we didn't need to retry this message.
TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext,
masterConfig.getMasterAddress(),
taskExecutionContext.getHost(),
System.currentTimeMillis());
return requestCommand.convert2Command();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,8 @@ public NettyExecutorManager() {

@PostConstruct
public void init() {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_RECALL, taskRecallProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_REJECT, taskRecallProcessor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
Expand Down Expand Up @@ -108,9 +108,9 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {

private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskRecallAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;

Expand All @@ -34,13 +35,16 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;

@Autowired
private MasterConfig masterConfig;

@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
Expand All @@ -65,9 +69,12 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
}

public void sendAckToWorker(TaskEvent taskEvent) {
TaskRecallAckCommand taskRecallAckCommand =
new TaskRecallAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskRecallAckCommand.convert2Command());
TaskRejectAckMessage taskRejectAckMessage = new TaskRejectAckMessage(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
Expand All @@ -50,13 +51,16 @@ public class TaskResultEventHandler implements TaskEventHandler {
@Autowired
private ProcessService processService;

@Autowired
private MasterConfig masterConfig;

@Override
public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, TaskEventHandleException {
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();

WorkflowExecuteRunnable workflowExecuteRunnable =
this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
processInstanceId);
if (workflowExecuteRunnable == null) {
sendAckToWorker(taskEvent);
throw new TaskEventHandleError(
Expand Down Expand Up @@ -105,9 +109,13 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, Ta
}

public void sendAckToWorker(TaskEvent taskEvent) {
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand =
new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
// we didn't set the receiver address, since the ack doen's need to retry
TaskExecuteAckMessage taskExecuteAckMessage = new TaskExecuteAckMessage(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
System.currentTimeMillis());
taskEvent.getChannel().writeAndFlush(taskExecuteAckMessage.convert2Command());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckMessage;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
Expand Down Expand Up @@ -106,9 +106,9 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {

private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand =
new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -55,15 +55,18 @@ public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
String.format("invalid command type : %s", command.getType()));

TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
TaskExecuteResultMessage taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultMessage.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResponseEvent.getProcessInstanceId(), taskResponseEvent.getTaskInstanceId());
logger.info("Received task execute response, event: {}", taskResponseEvent);
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
taskResultEvent.getTaskInstanceId());
logger.info("Received task execute result, event: {}", taskResultEvent);

taskEventService.addEvent(taskResponseEvent);
taskEventService.addEvent(taskResultEvent);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
TaskExecuteRunningMessage taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningMessage.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);

TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
taskEventService.addEvent(taskEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -54,8 +54,8 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_RECALL == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRecallCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRecallCommand.class);
Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRejectMessage recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectMessage.class);
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRecallCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;

import java.util.Date;
Expand Down Expand Up @@ -106,7 +106,7 @@ public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstance
return event;
}

public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
Expand All @@ -120,7 +120,7 @@ public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Chann
return event;
}

public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
Expand All @@ -138,7 +138,7 @@ public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Chann
return event;
}

public static TaskEvent newRecallEvent(TaskRecallCommand command, Channel channel) {
public static TaskEvent newRecallEvent(TaskRejectMessage command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
Expand Down
Loading

0 comments on commit 40588a9

Please sign in to comment.