Skip to content

Commit

Permalink
revert message to command
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jul 12, 2022
1 parent ed31f5c commit 4057d32
Show file tree
Hide file tree
Showing 28 changed files with 96 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
Expand Down Expand Up @@ -242,7 +242,7 @@ private void addDispatchEvent(TaskExecutionContext context, ExecutionContext exe

private Command toCommand(TaskExecutionContext taskExecutionContext) {
// todo: we didn't set the host here, since right now we didn't need to retry this message.
TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext,
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
masterConfig.getMasterAddress(),
taskExecutionContext.getHost(),
System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckMessage;
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
}

public void sendAckToWorker(TaskEvent taskEvent) {
TaskRejectAckMessage taskRejectAckMessage = new TaskRejectAckMessage(ExecutionStatus.SUCCESS.getCode(),
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
Expand Down Expand Up @@ -110,7 +110,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, Ta

public void sendAckToWorker(TaskEvent taskEvent) {
// we didn't set the receiver address, since the ack doen's need to retry
TaskExecuteAckMessage taskExecuteAckMessage = new TaskExecuteAckMessage(ExecutionStatus.SUCCESS.getCode(),
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -58,8 +58,8 @@ public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
String.format("invalid command type : %s", command.getType()));

TaskExecuteResultMessage taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultMessage.class);
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
TaskExecuteResultCommand.class);
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -54,7 +54,7 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningMessage taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningMessage.class);
TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);

TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskRejectMessage recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectMessage.class);
TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.dolphinscheduler.common.enums.TaskEventType;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;

import java.util.Date;
Expand Down Expand Up @@ -106,7 +106,7 @@ public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstance
return event;
}

public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Channel channel) {
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
Expand All @@ -120,7 +120,7 @@ public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Chann
return event;
}

public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel channel) {
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
Expand All @@ -138,7 +138,7 @@ public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel
return event;
}

public static TaskEvent newRecallEvent(TaskRejectMessage command, Channel channel) {
public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setTaskInstanceId(command.getTaskInstanceId());
event.setProcessInstanceId(command.getProcessInstanceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
Expand All @@ -48,7 +48,7 @@ public static ExecutionContext getExecutionContext(int port) {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();

TaskDispatchMessage requestCommand = new TaskDispatchMessage(context,
TaskDispatchCommand requestCommand = new TaskDispatchCommand(context,
"127.0.0.1:5678",
"127.0.0.1:5678",
System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
Expand Down Expand Up @@ -93,7 +93,7 @@ public void testExecuteWithException() throws ExecuteException {

}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext,
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
"127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.dolphinscheduler.server.master.processor;

import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
Expand All @@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
private TaskEventService taskEventService;
private ProcessService processService;
private TaskExecuteRunningMessage taskExecuteRunningMessage;
private TaskExecuteRunningCommand taskExecuteRunningMessage;
private TaskEvent taskResponseEvent;
private Channel channel;

Expand All @@ -63,7 +63,7 @@ public void before() {
channel = PowerMockito.mock(Channel.class);
taskResponseEvent = PowerMockito.mock(TaskEvent.class);

taskExecuteRunningMessage = new TaskExecuteRunningMessage("127.0.0.1:5678",
taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
" 127.0.0.1:1234",
System.currentTimeMillis());
taskExecuteRunningMessage.setStatus(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
Expand Down Expand Up @@ -77,7 +77,7 @@ public void before() {

Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));

TaskExecuteRunningMessage taskExecuteRunningMessage = new TaskExecuteRunningMessage("127.0.0.1:5678",
TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
taskExecuteRunningMessage.setProcessId(1);
Expand All @@ -90,7 +90,7 @@ public void before() {

ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);

TaskExecuteResultMessage taskExecuteResultMessage = new TaskExecuteResultMessage(NetUtils.getAddr(1234),
TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234),
NetUtils.getAddr(5678),
System.currentTimeMillis());
taskExecuteResultMessage.setProcessInstanceId(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
@Data
@NoArgsConstructor
public abstract class BaseMessage implements Serializable {
public abstract class BaseCommand implements Serializable {

private static final long serialVersionUID = -1L;

Expand All @@ -49,7 +49,7 @@ public abstract class BaseMessage implements Serializable {

protected long messageSendTime;

protected BaseMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
protected BaseCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
this.messageSenderAddress = messageSenderAddress;
this.messageReceiverAddress = messageReceiverAddress;
this.messageSendTime = messageSendTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskDispatchMessage extends BaseMessage {
public class TaskDispatchCommand extends BaseCommand {

private static final long serialVersionUID = -1L;

private TaskExecutionContext taskExecutionContext;

public TaskDispatchMessage(TaskExecutionContext taskExecutionContext,
public TaskDispatchCommand(TaskExecutionContext taskExecutionContext,
String messageSenderAddress,
String messageReceiverAddress,
long messageSendTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteAckMessage extends BaseMessage {
public class TaskExecuteAckCommand extends BaseCommand {

private int taskInstanceId;
private int status;

public TaskExecuteAckMessage(int status,
public TaskExecuteAckCommand(int status,
int taskInstanceId,
String sourceServerAddress,
String messageReceiverAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteResultMessage extends BaseMessage {
public class TaskExecuteResultCommand extends BaseCommand {

public TaskExecuteResultMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
public TaskExecuteResultCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskExecuteRunningMessage extends BaseMessage {
public class TaskExecuteRunningCommand extends BaseCommand {

/**
* taskInstanceId
Expand Down Expand Up @@ -80,7 +80,7 @@ public class TaskExecuteRunningMessage extends BaseMessage {
*/
private String appIds;

public TaskExecuteRunningMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
public TaskExecuteRunningCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) {
super(messageSenderAddress, messageReceiverAddress, messageSendTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
@NoArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class TaskRejectAckMessage extends BaseMessage {
public class TaskRejectAckCommand extends BaseCommand {

private int taskInstanceId;
private int status;

public TaskRejectAckMessage(int status,
public TaskRejectAckCommand(int status,
int taskInstanceId,
String messageSenderAddress,
String messageReceiverAddress,
Expand Down
Loading

0 comments on commit 4057d32

Please sign in to comment.