diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index ddd195eff8fb..9c7d048ff88d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage; +import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; @@ -242,7 +242,7 @@ private void addDispatchEvent(TaskExecutionContext context, ExecutionContext exe private Command toCommand(TaskExecutionContext taskExecutionContext) { // todo: we didn't set the host here, since right now we didn't need to retry this message. - TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext, + TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext, masterConfig.getMasterAddress(), taskExecutionContext.getHost(), System.currentTimeMillis()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java index e76b69b9f288..ac4dab50f26d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java @@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskRejectAckMessage; +import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; @@ -69,7 +69,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError { } public void sendAckToWorker(TaskEvent taskEvent) { - TaskRejectAckMessage taskRejectAckMessage = new TaskRejectAckMessage(ExecutionStatus.SUCCESS.getCode(), + TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId(), masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java index 59e46b200537..0373baeee14e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; @@ -110,7 +110,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, Ta public void sendAckToWorker(TaskEvent taskEvent) { // we didn't set the receiver address, since the ack doen's need to retry - TaskExecuteAckMessage taskExecuteAckMessage = new TaskExecuteAckMessage(ExecutionStatus.SUCCESS.getCode(), + TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId(), masterConfig.getMasterAddress(), taskEvent.getWorkerAddress(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java index 446dfbbcc18c..7f438e1d9cdf 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; @@ -58,8 +58,8 @@ public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteResultMessage taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), - TaskExecuteResultMessage.class); + TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(), + TaskExecuteResultCommand.class); TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(), diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java index df9d186e8cf8..96ff1ca40502 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java @@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; @@ -54,7 +54,7 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteRunningMessage taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningMessage.class); + TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class); logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage); TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java index d384763d9ff4..a5d404ec6581 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskRejectMessage; +import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; @@ -55,7 +55,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskRejectMessage recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectMessage.class); + TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class); TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel); try { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java index 24cae897759d..e383cad61247 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java @@ -19,9 +19,9 @@ import org.apache.dolphinscheduler.common.enums.TaskEventType; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage; -import org.apache.dolphinscheduler.remote.command.TaskRejectMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; +import org.apache.dolphinscheduler.remote.command.TaskRejectCommand; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import java.util.Date; @@ -106,7 +106,7 @@ public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstance return event; } - public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Channel channel) { + public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) { TaskEvent event = new TaskEvent(); event.setProcessInstanceId(command.getProcessInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId()); @@ -120,7 +120,7 @@ public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Chann return event; } - public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel channel) { + public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) { TaskEvent event = new TaskEvent(); event.setProcessInstanceId(command.getProcessInstanceId()); event.setTaskInstanceId(command.getTaskInstanceId()); @@ -138,7 +138,7 @@ public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel return event; } - public static TaskEvent newRecallEvent(TaskRejectMessage command, Channel channel) { + public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) { TaskEvent event = new TaskEvent(); event.setTaskInstanceId(command.getTaskInstanceId()); event.setProcessInstanceId(command.getProcessInstanceId()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java index e706a76337cd..7252f747ba5a 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage; +import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -48,7 +48,7 @@ public static ExecutionContext getExecutionContext(int port) { .buildProcessDefinitionRelatedInfo(processDefinition) .create(); - TaskDispatchMessage requestCommand = new TaskDispatchMessage(context, + TaskDispatchCommand requestCommand = new TaskDispatchCommand(context, "127.0.0.1:5678", "127.0.0.1:5678", System.currentTimeMillis()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index 9f9ecf906f64..b69ab890f982 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage; +import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; @@ -93,7 +93,7 @@ public void testExecuteWithException() throws ExecuteException { } private Command toCommand(TaskExecutionContext taskExecutionContext) { - TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext, + TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext, "127.0.0.1:5678", "127.0.0.1:1234", System.currentTimeMillis()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java index f8f3b5e5f67a..8c15cd9cbe2c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.processor; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent; import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; @@ -44,7 +44,7 @@ public class TaskAckProcessorTest { private TaskExecuteRunningProcessor taskExecuteRunningProcessor; private TaskEventService taskEventService; private ProcessService processService; - private TaskExecuteRunningMessage taskExecuteRunningMessage; + private TaskExecuteRunningCommand taskExecuteRunningMessage; private TaskEvent taskResponseEvent; private Channel channel; @@ -63,7 +63,7 @@ public void before() { channel = PowerMockito.mock(Channel.class); taskResponseEvent = PowerMockito.mock(TaskEvent.class); - taskExecuteRunningMessage = new TaskExecuteRunningMessage("127.0.0.1:5678", + taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678", " 127.0.0.1:1234", System.currentTimeMillis()); taskExecuteRunningMessage.setStatus(1); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 8a8579d21d55..3854ad77b04c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -20,8 +20,8 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator; @@ -77,7 +77,7 @@ public void before() { Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234)); - TaskExecuteRunningMessage taskExecuteRunningMessage = new TaskExecuteRunningMessage("127.0.0.1:5678", + TaskExecuteRunningCommand taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678", "127.0.0.1:1234", System.currentTimeMillis()); taskExecuteRunningMessage.setProcessId(1); @@ -90,7 +90,7 @@ public void before() { ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel); - TaskExecuteResultMessage taskExecuteResultMessage = new TaskExecuteResultMessage(NetUtils.getAddr(1234), + TaskExecuteResultCommand taskExecuteResultMessage = new TaskExecuteResultCommand(NetUtils.getAddr(1234), NetUtils.getAddr(5678), System.currentTimeMillis()); taskExecuteResultMessage.setProcessInstanceId(1); diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java index cf21b266466b..8207b5fecc2c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/BaseCommand.java @@ -33,7 +33,7 @@ */ @Data @NoArgsConstructor -public abstract class BaseMessage implements Serializable { +public abstract class BaseCommand implements Serializable { private static final long serialVersionUID = -1L; @@ -49,7 +49,7 @@ public abstract class BaseMessage implements Serializable { protected long messageSendTime; - protected BaseMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { + protected BaseCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { this.messageSenderAddress = messageSenderAddress; this.messageReceiverAddress = messageReceiverAddress; this.messageSendTime = messageSendTime; diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java index 3ae49f5fa2ea..f04c6107c1c2 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskDispatchCommand.java @@ -33,13 +33,13 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskDispatchMessage extends BaseMessage { +public class TaskDispatchCommand extends BaseCommand { private static final long serialVersionUID = -1L; private TaskExecutionContext taskExecutionContext; - public TaskDispatchMessage(TaskExecutionContext taskExecutionContext, + public TaskDispatchCommand(TaskExecutionContext taskExecutionContext, String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java similarity index 95% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java index 92ed3f406b03..a70cf8f23967 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java @@ -32,12 +32,12 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskExecuteAckMessage extends BaseMessage { +public class TaskExecuteAckCommand extends BaseCommand { private int taskInstanceId; private int status; - public TaskExecuteAckMessage(int status, + public TaskExecuteAckCommand(int status, int taskInstanceId, String sourceServerAddress, String messageReceiverAddress, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java index 55ecd2dacb09..12dfa56976f1 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResultCommand.java @@ -33,9 +33,9 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskExecuteResultMessage extends BaseMessage { +public class TaskExecuteResultCommand extends BaseCommand { - public TaskExecuteResultMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { + public TaskExecuteResultCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { super(messageSenderAddress, messageReceiverAddress, messageSendTime); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java index 85d1374954af..241d37744aa6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRunningCommand.java @@ -33,7 +33,7 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskExecuteRunningMessage extends BaseMessage { +public class TaskExecuteRunningCommand extends BaseCommand { /** * taskInstanceId @@ -80,7 +80,7 @@ public class TaskExecuteRunningMessage extends BaseMessage { */ private String appIds; - public TaskExecuteRunningMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { + public TaskExecuteRunningCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { super(messageSenderAddress, messageReceiverAddress, messageSendTime); } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java index a980a5f7e1b6..aed647bb4c65 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java @@ -28,12 +28,12 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskRejectAckMessage extends BaseMessage { +public class TaskRejectAckCommand extends BaseCommand { private int taskInstanceId; private int status; - public TaskRejectAckMessage(int status, + public TaskRejectAckCommand(int status, int taskInstanceId, String messageSenderAddress, String messageReceiverAddress, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectMessage.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java similarity index 94% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectMessage.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java index ac94efdcf761..a9ce5c56a7e9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectMessage.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java @@ -31,7 +31,7 @@ @NoArgsConstructor @ToString(callSuper = true) @EqualsAndHashCode(callSuper = true) -public class TaskRejectMessage extends BaseMessage { +public class TaskRejectCommand extends BaseCommand { /** * taskInstanceId @@ -48,7 +48,7 @@ public class TaskRejectMessage extends BaseMessage { */ private int processInstanceId; - public TaskRejectMessage(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { + public TaskRejectCommand(String messageSenderAddress, String messageReceiverAddress, long messageSendTime) { super(messageSenderAddress, messageReceiverAddress, messageSendTime); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index 2634194f333a..18dceb069b1e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.LoggerUtils; -import org.apache.dolphinscheduler.remote.command.BaseMessage; +import org.apache.dolphinscheduler.remote.command.BaseCommand; import org.apache.dolphinscheduler.remote.command.CommandType; import java.time.Duration; @@ -53,9 +53,9 @@ protected MessageRetryRunner() { @Autowired private ApplicationContext applicationContext; - private Map> messageSenderMap = new HashMap<>(); + private Map> messageSenderMap = new HashMap<>(); - private Map> needToRetryMessages = new ConcurrentHashMap<>(); + private Map> needToRetryMessages = new ConcurrentHashMap<>(); @PostConstruct public void init() { @@ -73,13 +73,13 @@ public synchronized void start() { logger.info("Message retry runner started"); } - public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseMessage baseMessage) { + public void addRetryMessage(int taskInstanceId, @NonNull CommandType messageType, BaseCommand baseCommand) { needToRetryMessages.computeIfAbsent(taskInstanceId, k -> new ConcurrentHashMap<>()).put(messageType, - baseMessage); + baseCommand); } public void removeRetryMessage(int taskInstanceId, @NonNull CommandType messageType) { - Map retryMessages = needToRetryMessages.get(taskInstanceId); + Map retryMessages = needToRetryMessages.get(taskInstanceId); if (retryMessages != null) { retryMessages.remove(messageType); } @@ -90,7 +90,7 @@ public void removeRetryMessages(int taskInstanceId) { } public void updateMessageHost(int taskInstanceId, String messageReceiverHost) { - Map needToRetryMessages = this.needToRetryMessages.get(taskInstanceId); + Map needToRetryMessages = this.needToRetryMessages.get(taskInstanceId); if (needToRetryMessages != null) { needToRetryMessages.values().forEach(baseMessage -> { baseMessage.setMessageReceiverAddress(messageReceiverHost); @@ -106,13 +106,13 @@ public void run() { } long now = System.currentTimeMillis(); - for (Map.Entry> taskEntry : needToRetryMessages.entrySet()) { + for (Map.Entry> taskEntry : needToRetryMessages.entrySet()) { Integer taskInstanceId = taskEntry.getKey(); LoggerUtils.setTaskInstanceIdMDC(taskInstanceId); try { - for (Map.Entry messageEntry : taskEntry.getValue().entrySet()) { + for (Map.Entry messageEntry : taskEntry.getValue().entrySet()) { CommandType messageType = messageEntry.getKey(); - BaseMessage message = messageEntry.getValue(); + BaseCommand message = messageEntry.getValue(); if (now - message.getMessageSendTime() > MESSAGE_RETRY_WINDOW) { logger.info("Begin retry send message to master, message: {}", message); message.setMessageSendTime(now); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java index 3a6e3eef5a09..519c5b2b3e7c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java @@ -18,11 +18,11 @@ package org.apache.dolphinscheduler.server.worker.message; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.command.BaseMessage; +import org.apache.dolphinscheduler.remote.command.BaseCommand; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; -public interface MessageSender { +public interface MessageSender { /** * Send the message diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java index cbe15f3df2fb..a7d05a479194 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskExecuteResultMessageSender.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -29,7 +29,7 @@ import org.springframework.stereotype.Component; @Component -public class TaskExecuteResultMessageSender implements MessageSender { +public class TaskExecuteResultMessageSender implements MessageSender { @Autowired private WorkerConfig workerConfig; @@ -38,14 +38,14 @@ public class TaskExecuteResultMessageSender implements MessageSender { +public class TaskExecuteRunningMessageSender implements MessageSender { @Autowired private WorkerRpcClient workerRpcClient; @@ -40,14 +40,14 @@ public class TaskExecuteRunningMessageSender implements MessageSender { +public class TaskRejectMessageSender implements MessageSender { @Autowired private WorkerRpcClient workerRpcClient; @@ -38,12 +38,12 @@ public class TaskRejectMessageSender implements MessageSender private WorkerConfig workerConfig; @Override - public void sendMessage(TaskRejectMessage message) throws RemotingException { + public void sendMessage(TaskRejectCommand message) throws RemotingException { workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command()); } - public TaskRejectMessage buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) { - TaskRejectMessage taskRejectMessage = new TaskRejectMessage(workerConfig.getWorkerAddress(), + public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) { + TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(), masterAddress, System.currentTimeMillis()); taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java index 0b8c7b7fdd83..f278edf6b25c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage; +import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -98,7 +98,7 @@ public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskDispatchMessage taskDispatchMessage = JSONUtils.parseObject(command.getBody(), TaskDispatchMessage.class); + TaskDispatchCommand taskDispatchMessage = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class); if (taskDispatchMessage == null) { logger.error("task execute request message content is null"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java index 59f943c2f5d2..c60c283c7f3a 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskExecuteAckMessage; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; @@ -51,8 +51,8 @@ public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskExecuteAckMessage taskExecuteAckMessage = JSONUtils.parseObject(command.getBody(), - TaskExecuteAckMessage.class); + TaskExecuteAckCommand taskExecuteAckMessage = JSONUtils.parseObject(command.getBody(), + TaskExecuteAckCommand.class); if (taskExecuteAckMessage == null) { logger.error("task execute response ack command is null"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java index 6ac41abd8d77..850630efe4e1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskRejectAckMessage; +import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; @@ -48,8 +48,8 @@ public void process(Channel channel, Command command) { Preconditions.checkArgument(CommandType.TASK_REJECT_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskRejectAckMessage taskRejectAckMessage = JSONUtils.parseObject(command.getBody(), - TaskRejectAckMessage.class); + TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(), + TaskRejectAckCommand.class); if (taskRejectAckMessage == null) { return; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java index 0dc415b5e46f..b7d118508533 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.worker.rpc; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.remote.command.BaseMessage; +import org.apache.dolphinscheduler.remote.command.BaseCommand; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; @@ -65,12 +65,12 @@ public void sendMessageNeedAck(@NonNull TaskExecutionContext taskExecutionContex if (messageSender == null) { throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType); } - BaseMessage baseMessage = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress); + BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress); try { - messageRetryRunner.addRetryMessage(taskExecutionContext.getTaskInstanceId(), messageType, baseMessage); - messageSender.sendMessage(baseMessage); + messageRetryRunner.addRetryMessage(taskExecutionContext.getTaskInstanceId(), messageType, baseCommand); + messageSender.sendMessage(baseCommand); } catch (RemotingException e) { - logger.error("Send message error, messageType: {}, message: {}", messageType, baseMessage); + logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand); } } @@ -81,11 +81,11 @@ public void sendMessage(@NonNull TaskExecutionContext taskExecutionContext, if (messageSender == null) { throw new IllegalArgumentException("The messageType is invalidated, messageType: " + messageType); } - BaseMessage baseMessage = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress); + BaseCommand baseCommand = messageSender.buildMessage(taskExecutionContext, messageReceiverAddress); try { - messageSender.sendMessage(baseMessage); + messageSender.sendMessage(baseCommand); } catch (RemotingException e) { - logger.error("Send message error, messageType: {}, message: {}", messageType, baseMessage); + logger.error("Send message error, messageType: {}, message: {}", messageType, baseCommand); } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java index 2a01b0b49d57..c1e39c89fcfd 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessorTest.java @@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage; -import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage; +import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand; import org.apache.dolphinscheduler.remote.utils.ChannelUtils; import org.apache.dolphinscheduler.remote.utils.JsonSerializer; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -73,7 +73,7 @@ public class TaskDispatchProcessorTest { private Command ackCommand; - private TaskDispatchMessage taskRequestCommand; + private TaskDispatchCommand taskRequestCommand; private AlertClientService alertClientService; @@ -88,10 +88,10 @@ public void before() throws Exception { workerConfig.setListenPort(1234); command = new Command(); command.setType(CommandType.TASK_DISPATCH_REQUEST); - ackCommand = new TaskExecuteRunningMessage("127.0.0.1:1234", + ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234", "127.0.0.1:5678", System.currentTimeMillis()).convert2Command(); - taskRequestCommand = new TaskDispatchMessage(taskExecutionContext, + taskRequestCommand = new TaskDispatchCommand(taskExecutionContext, "127.0.0.1:5678", "127.0.0.1:1234", System.currentTimeMillis()); @@ -125,11 +125,11 @@ public void before() throws Exception { workerExecService); PowerMockito.mockStatic(JsonSerializer.class); - PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchMessage.class)).thenReturn( + PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn( taskRequestCommand); PowerMockito.mockStatic(JSONUtils.class); - PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchMessage.class)).thenReturn( + PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn( taskRequestCommand); PowerMockito.mockStatic(FileUtils.class);